博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
批量日志入DataHub环境(非实时)
阅读量:4092 次
发布时间:2019-05-25

本文共 13879 字,大约阅读时间需要 46 分钟。

先说下需求吧,由于前置机每天产生数百G的日志报文,这些日志只有在现场反应出现问题之后,才会查找具体的日志,因此需要将前置机上面的大量日志转存到dataHub上(datahub会自动向odps(MaxCompute)转存),本来先要将入datahub代码融入前置机,这样前置直接实时往datahub写入,但是由于一些历史原因,不能这样做,最后考虑到业务上面对实时性要求不强自己便通过java程序实现了日志到datahub的转存——这个程序运行数月之后,现在已经停用,目前日志都是通过ELK实现实时保存了,不过在datahub到ELK的过度期中,这段程序圆满的完成了他的工作!

程序的结构图如下:

/** * 前置日志入dataHub入口程序 * 参数为日志所在文件夹目录(不以“/”结尾),参数可为多个日志目录,空格分开 * @author yangcheng * 2018年5月8日 */public class GetDataV2 {	public static ConcurrentHashMap
localCache = new ConcurrentHashMap
(); public static void main(String[] args) { System.out.println("服务器可用线程数:"+Runtime.getRuntime().availableProcessors());// DateFormat format = new SimpleDateFormat("HH"); DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); int threadNum = 16;//Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum, 100, 1, TimeUnit.MINUTES, new LinkedBlockingQueue
()); LinkedBlockingQueue
fileNameQueue = new LinkedBlockingQueue
(); executor.execute(new DealWithFailure(args[0], fileNameQueue)); while(true){ //打印当前时间 System.out.println("开始执行时间:"+format2.format(new Date())); //计数器-计算每天0点30分到1点30分之间报文总数 AtomicLong counter = new AtomicLong(0); DatahubClient client = DataHubUtil.getClient(); CountDownLatch cuCountDownLatch =new CountDownLatch(threadNum); for (String path : args) { String filePath = path;//路径 System.out.println("开始执行。。。。。。。。。。。。。。。。。。"+filePath+"——————启用线程数:"+threadNum); //获取客户端 生产数据 try { File file = new File(filePath); String[] files = file.list(); //执行更名操作 for (String fileName : files) { if(fileName.endsWith(".log")){ System.out.println(".log: "+fileName); continue; } if(!isQz(fileName)){ System.out.println("isQz: "+fileName); continue; } String pathName = filePath+"/"+fileName; String newPathName = filePath+"/back_"+fileName; String[] arg = {"/bin/sh","-c","mv "+pathName+" "+ newPathName}; Runtime.getRuntime().exec(arg); //更名之后的文件名 fileNameQueue.put(newPathName); //newPathName } } catch (Exception e) { e.printStackTrace(); } } for(int i = 0; i < threadNum ; i++){ executor.execute(new Consumer(cuCountDownLatch,fileNameQueue, client,localCache)); } try { cuCountDownLatch.await(); getNum(localCache); System.out.println("执行结束................"); } catch (InterruptedException e1) { e1.printStackTrace(); }finally{ //关闭资源 DataHubUtil.closeAll(client); } try { Thread.sleep(1000*60*30); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void getNum(ConcurrentHashMap
localCache2) { Set
keySet = localCache2.keySet(); for(String key : keySet){ Integer value = localCache2.get(key); System.out.println("在 "+key+"时间点段"+"=============有"+value+"条"); } } public static boolean isQz(String fileName){ if(fileName.startsWith("confirm") ||fileName.startsWith("event") ){ return true; } return false; }
/** * 前置日志入dataHub工具类 * @author yangcheng * 2018年5月8日 */public class DataHubUtil {	private static final String endpoint="";//	private static final String project="";	private static final String topic= "";  //面向对象  qz_698_day_real_monitor	private static final String accessId="";	private static final String accessKey="";		/**	 * 获取datahub客户端	 * 客户端使用完成之后,必须执行closeAll()方法关闭资源	 * @return DatahubClient	 */	public static DatahubClient getClient(){		AliyunAccount account = new AliyunAccount(accessId, accessKey);		DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);		//client		DatahubClient client = new DatahubClient(conf);		return client;	}		/**	 * 添加数据到datahub	 * @return 0表示成功,非0表示失败	 */	public static void add2DataHub(DatahubClient client, String shardId ,List
dataList){ //record String threadName = Thread.currentThread().getName(); RecordSchema schema = client.getTopic(project, topic).getRecordSchema(); List
recordEntries = new ArrayList
(); for (Datas data : dataList) { RecordEntry entry = new RecordEntry(schema); entry.setString("area_code", data.getArea_code()); entry.setString("terminal_addr", data.getTerminal_addr()); entry.setString("afn", data.getAfn()); entry.setString("fn", data.getFn()); entry.setString("content", data.getContent()); entry.setString("m_type", data.getP_Type()); entry.setTimeStampInDate("insert_time", data.getInsert_time()); entry.setString("data_date", data.getDataDate()); entry.setShardId(shardId); recordEntries.add(entry); } try { Thread.sleep(1000);//一秒执行一次 } catch (InterruptedException e) { e.printStackTrace(); } int i = client.putRecords(project, topic, recordEntries).getFailedRecordCount(); if(i>0){ System.out.println(threadName+"首次上传失败,失败数为=="+i+"两秒之后重复上传"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } retry(client,recordEntries,3); } } /** * 当发现失败之后 重试机制 * @param recordEntries */ public static void retry(DatahubClient client,List
recordEntries,int retryTimes){ boolean suc = false; String threadName = Thread.currentThread().getName(); while(retryTimes != 0){ retryTimes = retryTimes - 1 ; int resulFNum = client.putRecords(project, topic, recordEntries).getFailedRecordCount(); if(resulFNum > 0){ System.out.println(threadName+"重复上传至dataHub......"); continue; } suc = true; System.out.println(threadName+"重复上传至dataHub成功......"); break; } //重复上传失败 if(!suc){ System.out.println(threadName+"重复上传至dataHub失败......"); throw new RuntimeException("retryFailure"); } } /** * 获取数据 * @param client 客户端 * @param shardId 通道编号 * @param timeStart 日志起始时间 * @param limitNum 获取日志条数 */ public static void getRecord(DatahubClient client , String shardId,Long timeStart,int limitNum){ RecordSchema schema = client.getTopic(project, topic).getRecordSchema(); GetCursorResult cursorResult=client.getCursor(project, topic, shardId, timeStart); GetRecordsResult cursorResul=client.getRecords(project, topic, shardId, cursorResult.getCursor(), limitNum, schema); for (RecordEntry element : cursorResul.getRecords()) { System.out.println(element.toJsonNode()); } } /** * 根据行政区码和终端地址获取通道编号 * @param area_code * @param terminal_addr * @return 通道编号 */ public static String getShardId(String area_code,String terminal_addr){ int id=(area_code+terminal_addr).hashCode() % 10 ; return String.valueOf(id); } /** * 关闭dataHub资源 * @param DatahubClient 需要关闭的DatahubClient */ public static void closeAll(DatahubClient... client){ for (DatahubClient cl : client) { try { cl.close(); } catch (Exception e) { if(cl != null){ cl = null; }// e.printStackTrace(); } } } public static void closeFileStream(BufferedReader... brs){ for (BufferedReader bufferedReader : brs) { try { bufferedReader.close(); } catch (IOException e) { if(bufferedReader != null){ bufferedReader=null; } e.printStackTrace(); } } } /** * 批量入dataHub * @param client * @param dataList 数据集合 */ public static void addBacthData(DatahubClient client, List
dataList) { //record RecordSchema schema = client.getTopic(project, topic).getRecordSchema(); List
recordEntries = new ArrayList
(); for (Datas data : dataList) { RecordEntry entry = new RecordEntry(schema); entry.setString("area_code", data.getArea_code()); entry.setString("terminal_addr", data.getTerminal_addr()); entry.setString("afn", data.getAfn()); entry.setString("fn", data.getFn()); entry.setString("content", data.getContent()); entry.setString("m_type", data.getP_Type()); entry.setTimeStampInDate("insert_time", data.getInsert_time()); entry.setString("data_date", data.getDataDate()); entry.setShardId(data.getShardId()); recordEntries.add(entry); } client.putRecords(project, topic, recordEntries).getFailedRecordCount(); }}
/** * 线程01 * @author yangcheng   * @date 2018年11月20日   * @version V1.0 */public class DealWithFailure implements Runnable{	String filePath = null;	//Cosumer线程获取文件名的队列	LinkedBlockingQueue
fileNameQueue = null; public DealWithFailure(String filePath,LinkedBlockingQueue
fileNameQueue) { // System.out.println("守护线程启动"); this.fileNameQueue = fileNameQueue; this.filePath = filePath; } @Override public void run() { String surThreadName = Thread.currentThread().getName(); while(true){ try { File file = new File(filePath); String[] files = file.list(); //执行更名操作 for (String fileName : files) { if(fileName.contains("_failed")){ //获取到的所有读取失败的文件全路径名 又扔回队列中 fileNameQueue.put(filePath+"/"+fileName); System.out.println(surThreadName+"获取到并重新添加到队列中的的执行失败的文件=="+filePath+"/"+fileName); } } Thread.sleep(1000*60*31);//保证在work执行sleep的区间中,该线程只执行一次 } catch (Exception e) { e.printStackTrace(); } } }}
/** * worker线程 * 用于多线程入dataHub * 读取被改名之后的文件 * @author yangcheng * 2018年5月17日 */public class Consumer implements Runnable{	LinkedBlockingQueue
fileNameQueue = null; DatahubClient client = null; CountDownLatch cuCountDownLatch = null; //计数器用于统计指定时间段内的数据 否则需要清空才可以 AtomicLong counter = null; ConcurrentHashMap
localCache = null; Map
hashMap = Collections.synchronizedMap(new HashMap
()); public Consumer(CountDownLatch cuCountDownLatch, LinkedBlockingQueue
fileNameQueue,DatahubClient client,AtomicLong counter) { this .fileNameQueue = fileNameQueue; this.client = client; this.cuCountDownLatch = cuCountDownLatch; this.counter = counter; } /** * 以小时为单位缓存执行结果 * @param cuCountDownLatch * @param fileNameQueue * @param client * @param localCache */ public Consumer(CountDownLatch cuCountDownLatch, LinkedBlockingQueue
fileNameQueue,DatahubClient client,ConcurrentHashMap
localCache ) { this .fileNameQueue = fileNameQueue; this.client = client; this.cuCountDownLatch = cuCountDownLatch; this.localCache = localCache; } @Override public void run() { String filePathName = null; AtomicLong upFailedNum = new AtomicLong(); try{ while((filePathName=this.fileNameQueue.poll()) != null){ System.out.println(Thread.currentThread().getName()+"开始读取fileName======================="+filePathName); Date beginDate =new Date(); FileReader fr = null; try { fr = new FileReader(filePathName); } catch (FileNotFoundException e1) { e1.printStackTrace(); continue; } BufferedReader br = new BufferedReader(fr); String line = null; List
dataList = new ArrayList
(); DateFormat dataDateFormat = new SimpleDateFormat("yyyyMMdd"); String dataDate = dataDateFormat.format(beginDate); //获取通道编号 String shardId=String.valueOf(RandomUtils.nextInt(0,9)); boolean isCanBeRmove = true ; String exceptionMsg = null; try { while((line = br.readLine()) != null ){ if(this.localCache != null ){ if(line.contains("AFN=13;FN=161")){ String timeStr = line.substring(1, 14); methodYQ(timeStr); } } Datas data = new Datas(); data.setContent(line); data.setDataDate(dataDate); dataList.add(data); if(dataList.size() % 1000 == 0){ //添加记录 DataHubUtil.add2DataHub(client, shardId ,dataList); dataList = new ArrayList
(); } } //剩余不满1000的数据一次提交 if(dataList.size() != 0){ //添加记录 DataHubUtil.add2DataHub(client, shardId ,dataList); } } catch (Exception e) { exceptionMsg = e.getMessage(); String threadName = Thread.currentThread().getName(); System.out.println(threadName+"捕获到dataHub上传异常==="+exceptionMsg); isCanBeRmove = false ; // DataHubUtil.closeFileStream(br); e.printStackTrace(); }finally{ DataHubUtil.closeFileStream(br); } //删除已经读过的日志文件 Date endDate =new Date(); System.out.println(Thread.currentThread().getName()+"已读取文件名称:"+filePathName+" ; 开始时间:"+beginDate+"结束时间:"+endDate+".................."); String pathName = filePathName; String[] arg = {"/bin/sh","-c","rm "+pathName}; if(isCanBeRmove && exceptionMsg == null){ try { Runtime.getRuntime().exec(arg); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"删除文件:"+pathName); }else{ long thisNum = upFailedNum.getAndIncrement();//防止重名 System.out.println(Thread.currentThread().getName()+"未删除文件:"+pathName+";已更名为:"+pathName+"_failed"+thisNum); if("retryFailure".equals(exceptionMsg)){ String[] args = {"/bin/sh","-c","mv "+pathName+" "+ pathName+"_failed"+thisNum}; try { Runtime.getRuntime().exec(args); } catch (IOException e) { e.printStackTrace(); } } } } }finally{ //将本类的计数转存到主程序的缓存中、 methodYQ2(hashMap); cuCountDownLatch.countDown(); } } /** * 将本类的计数转存到主程序的缓存中、 * @param map * @param timeStr */ private void methodYQ2(Map
hashMap2) { if(localCache!=null){ Set
keySet = hashMap2.keySet(); for (String key : keySet) { if(localCache.containsKey(key)){ Integer ii = hashMap2.get(key); localCache.put(key, ii+localCache.get(key)); }else{ localCache.put(key, hashMap2.get(key)); } } } } /** * 判断时间字符串在指定的时间断内 * @return */ private boolean strDog(String str){ String s = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); String start = " 00:30:00"; String end = " 01:30:00"; int a = (s+start).compareTo(str); int b = str.compareTo(s+end); return (str.contains(s) && a<0 && b<0) ? true : false; } /** * 根据时间点存Map * @return */ private void methodYQ(String timeStr){ if(hashMap!=null){ if(hashMap.containsKey(timeStr)){ Integer i = hashMap.get(timeStr); hashMap.put(timeStr, i+1); }else{ hashMap.put(timeStr,1); } } }}

程序编写过程中的感想:

一开始觉得datahub是大数据环境,尤其是阿里开发的,所以对他的吞吐量和性能严重高估,觉得datahub多大的并发都能扛住,实时证明,实际使用过程中会有很多不确定性,然后datahub也没有想想中的那么能抗,我这个程序当时部署了20个,且每个程序开16个线程,运行了一段时间没有发现什么问题,程序一直不报错,结果统计数据库中日志丢了大量数据,一开始怀疑是程序读取日志部分有错,最后通过增加计数器发现,固定时间段内程序从log中读取的报文数量是对的,但入datahub之后就不对了,最后通过查询API发现,putRecords有一个失败数量返回值,这个一开始没有发现,最后才知道 ,这个方法执行失败是不报错的,而是返回当前执行失败的数量,于是我加上了重试机制!程序再次发布之后,通过日志发现,基本上百分之80以上的执行过程都会走到重试机制---后续通过配置shardid以及datahub的参数,情况有所好转。

转载地址:http://upcii.baihongyu.com/

你可能感兴趣的文章
DirectX11 聚光灯
查看>>
DirectX11 HLSL打包(packing)格式和“pad”变量的必要性
查看>>
DirectX11 光照演示示例Demo
查看>>
漫谈一下前端的可视化技术
查看>>
VUe+webpack构建单页router应用(一)
查看>>
Vue+webpack构建单页router应用(二)
查看>>
从头开始讲Node.js——异步与事件驱动
查看>>
Node.js-模块和包
查看>>
Node.js核心模块
查看>>
express的应用
查看>>
NodeJS开发指南——mongoDB、Session
查看>>
Express: Can’t set headers after they are sent.
查看>>
2017年,这一次我们不聊技术
查看>>
实现接口创建线程
查看>>
Java对象序列化与反序列化(1)
查看>>
HTML5的表单验证实例
查看>>
JavaScript入门笔记:全选功能的实现
查看>>
程序设计方法概述:从面相对象到面向功能到面向对象
查看>>
数据库事务
查看>>
JavaScript基础1:JavaScript 错误 - Throw、Try 和 Catch
查看>>