本文共 13879 字,大约阅读时间需要 46 分钟。
先说下需求吧,由于前置机每天产生数百G的日志报文,这些日志只有在现场反应出现问题之后,才会查找具体的日志,因此需要将前置机上面的大量日志转存到dataHub上(datahub会自动向odps(MaxCompute)转存),本来先要将入datahub代码融入前置机,这样前置直接实时往datahub写入,但是由于一些历史原因,不能这样做,最后考虑到业务上面对实时性要求不强自己便通过java程序实现了日志到datahub的转存——这个程序运行数月之后,现在已经停用,目前日志都是通过ELK实现实时保存了,不过在datahub到ELK的过度期中,这段程序圆满的完成了他的工作!
程序的结构图如下:
/** * 前置日志入dataHub入口程序 * 参数为日志所在文件夹目录(不以“/”结尾),参数可为多个日志目录,空格分开 * @author yangcheng * 2018年5月8日 */public class GetDataV2 { public static ConcurrentHashMaplocalCache = new ConcurrentHashMap (); public static void main(String[] args) { System.out.println("服务器可用线程数:"+Runtime.getRuntime().availableProcessors());// DateFormat format = new SimpleDateFormat("HH"); DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); int threadNum = 16;//Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum, 100, 1, TimeUnit.MINUTES, new LinkedBlockingQueue ()); LinkedBlockingQueue fileNameQueue = new LinkedBlockingQueue (); executor.execute(new DealWithFailure(args[0], fileNameQueue)); while(true){ //打印当前时间 System.out.println("开始执行时间:"+format2.format(new Date())); //计数器-计算每天0点30分到1点30分之间报文总数 AtomicLong counter = new AtomicLong(0); DatahubClient client = DataHubUtil.getClient(); CountDownLatch cuCountDownLatch =new CountDownLatch(threadNum); for (String path : args) { String filePath = path;//路径 System.out.println("开始执行。。。。。。。。。。。。。。。。。。"+filePath+"——————启用线程数:"+threadNum); //获取客户端 生产数据 try { File file = new File(filePath); String[] files = file.list(); //执行更名操作 for (String fileName : files) { if(fileName.endsWith(".log")){ System.out.println(".log: "+fileName); continue; } if(!isQz(fileName)){ System.out.println("isQz: "+fileName); continue; } String pathName = filePath+"/"+fileName; String newPathName = filePath+"/back_"+fileName; String[] arg = {"/bin/sh","-c","mv "+pathName+" "+ newPathName}; Runtime.getRuntime().exec(arg); //更名之后的文件名 fileNameQueue.put(newPathName); //newPathName } } catch (Exception e) { e.printStackTrace(); } } for(int i = 0; i < threadNum ; i++){ executor.execute(new Consumer(cuCountDownLatch,fileNameQueue, client,localCache)); } try { cuCountDownLatch.await(); getNum(localCache); System.out.println("执行结束................"); } catch (InterruptedException e1) { e1.printStackTrace(); }finally{ //关闭资源 DataHubUtil.closeAll(client); } try { Thread.sleep(1000*60*30); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void getNum(ConcurrentHashMap localCache2) { Set keySet = localCache2.keySet(); for(String key : keySet){ Integer value = localCache2.get(key); System.out.println("在 "+key+"时间点段"+"=============有"+value+"条"); } } public static boolean isQz(String fileName){ if(fileName.startsWith("confirm") ||fileName.startsWith("event") ){ return true; } return false; }
/** * 前置日志入dataHub工具类 * @author yangcheng * 2018年5月8日 */public class DataHubUtil { private static final String endpoint="";// private static final String project=""; private static final String topic= ""; //面向对象 qz_698_day_real_monitor private static final String accessId=""; private static final String accessKey=""; /** * 获取datahub客户端 * 客户端使用完成之后,必须执行closeAll()方法关闭资源 * @return DatahubClient */ public static DatahubClient getClient(){ AliyunAccount account = new AliyunAccount(accessId, accessKey); DatahubConfiguration conf = new DatahubConfiguration(account, endpoint); //client DatahubClient client = new DatahubClient(conf); return client; } /** * 添加数据到datahub * @return 0表示成功,非0表示失败 */ public static void add2DataHub(DatahubClient client, String shardId ,ListdataList){ //record String threadName = Thread.currentThread().getName(); RecordSchema schema = client.getTopic(project, topic).getRecordSchema(); List recordEntries = new ArrayList (); for (Datas data : dataList) { RecordEntry entry = new RecordEntry(schema); entry.setString("area_code", data.getArea_code()); entry.setString("terminal_addr", data.getTerminal_addr()); entry.setString("afn", data.getAfn()); entry.setString("fn", data.getFn()); entry.setString("content", data.getContent()); entry.setString("m_type", data.getP_Type()); entry.setTimeStampInDate("insert_time", data.getInsert_time()); entry.setString("data_date", data.getDataDate()); entry.setShardId(shardId); recordEntries.add(entry); } try { Thread.sleep(1000);//一秒执行一次 } catch (InterruptedException e) { e.printStackTrace(); } int i = client.putRecords(project, topic, recordEntries).getFailedRecordCount(); if(i>0){ System.out.println(threadName+"首次上传失败,失败数为=="+i+"两秒之后重复上传"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } retry(client,recordEntries,3); } } /** * 当发现失败之后 重试机制 * @param recordEntries */ public static void retry(DatahubClient client,List recordEntries,int retryTimes){ boolean suc = false; String threadName = Thread.currentThread().getName(); while(retryTimes != 0){ retryTimes = retryTimes - 1 ; int resulFNum = client.putRecords(project, topic, recordEntries).getFailedRecordCount(); if(resulFNum > 0){ System.out.println(threadName+"重复上传至dataHub......"); continue; } suc = true; System.out.println(threadName+"重复上传至dataHub成功......"); break; } //重复上传失败 if(!suc){ System.out.println(threadName+"重复上传至dataHub失败......"); throw new RuntimeException("retryFailure"); } } /** * 获取数据 * @param client 客户端 * @param shardId 通道编号 * @param timeStart 日志起始时间 * @param limitNum 获取日志条数 */ public static void getRecord(DatahubClient client , String shardId,Long timeStart,int limitNum){ RecordSchema schema = client.getTopic(project, topic).getRecordSchema(); GetCursorResult cursorResult=client.getCursor(project, topic, shardId, timeStart); GetRecordsResult cursorResul=client.getRecords(project, topic, shardId, cursorResult.getCursor(), limitNum, schema); for (RecordEntry element : cursorResul.getRecords()) { System.out.println(element.toJsonNode()); } } /** * 根据行政区码和终端地址获取通道编号 * @param area_code * @param terminal_addr * @return 通道编号 */ public static String getShardId(String area_code,String terminal_addr){ int id=(area_code+terminal_addr).hashCode() % 10 ; return String.valueOf(id); } /** * 关闭dataHub资源 * @param DatahubClient 需要关闭的DatahubClient */ public static void closeAll(DatahubClient... client){ for (DatahubClient cl : client) { try { cl.close(); } catch (Exception e) { if(cl != null){ cl = null; }// e.printStackTrace(); } } } public static void closeFileStream(BufferedReader... brs){ for (BufferedReader bufferedReader : brs) { try { bufferedReader.close(); } catch (IOException e) { if(bufferedReader != null){ bufferedReader=null; } e.printStackTrace(); } } } /** * 批量入dataHub * @param client * @param dataList 数据集合 */ public static void addBacthData(DatahubClient client, List dataList) { //record RecordSchema schema = client.getTopic(project, topic).getRecordSchema(); List recordEntries = new ArrayList (); for (Datas data : dataList) { RecordEntry entry = new RecordEntry(schema); entry.setString("area_code", data.getArea_code()); entry.setString("terminal_addr", data.getTerminal_addr()); entry.setString("afn", data.getAfn()); entry.setString("fn", data.getFn()); entry.setString("content", data.getContent()); entry.setString("m_type", data.getP_Type()); entry.setTimeStampInDate("insert_time", data.getInsert_time()); entry.setString("data_date", data.getDataDate()); entry.setShardId(data.getShardId()); recordEntries.add(entry); } client.putRecords(project, topic, recordEntries).getFailedRecordCount(); }}
/** * 线程01 * @author yangcheng * @date 2018年11月20日 * @version V1.0 */public class DealWithFailure implements Runnable{ String filePath = null; //Cosumer线程获取文件名的队列 LinkedBlockingQueuefileNameQueue = null; public DealWithFailure(String filePath,LinkedBlockingQueue fileNameQueue) { // System.out.println("守护线程启动"); this.fileNameQueue = fileNameQueue; this.filePath = filePath; } @Override public void run() { String surThreadName = Thread.currentThread().getName(); while(true){ try { File file = new File(filePath); String[] files = file.list(); //执行更名操作 for (String fileName : files) { if(fileName.contains("_failed")){ //获取到的所有读取失败的文件全路径名 又扔回队列中 fileNameQueue.put(filePath+"/"+fileName); System.out.println(surThreadName+"获取到并重新添加到队列中的的执行失败的文件=="+filePath+"/"+fileName); } } Thread.sleep(1000*60*31);//保证在work执行sleep的区间中,该线程只执行一次 } catch (Exception e) { e.printStackTrace(); } } }}
/** * worker线程 * 用于多线程入dataHub * 读取被改名之后的文件 * @author yangcheng * 2018年5月17日 */public class Consumer implements Runnable{ LinkedBlockingQueuefileNameQueue = null; DatahubClient client = null; CountDownLatch cuCountDownLatch = null; //计数器用于统计指定时间段内的数据 否则需要清空才可以 AtomicLong counter = null; ConcurrentHashMap localCache = null; Map hashMap = Collections.synchronizedMap(new HashMap ()); public Consumer(CountDownLatch cuCountDownLatch, LinkedBlockingQueue fileNameQueue,DatahubClient client,AtomicLong counter) { this .fileNameQueue = fileNameQueue; this.client = client; this.cuCountDownLatch = cuCountDownLatch; this.counter = counter; } /** * 以小时为单位缓存执行结果 * @param cuCountDownLatch * @param fileNameQueue * @param client * @param localCache */ public Consumer(CountDownLatch cuCountDownLatch, LinkedBlockingQueue fileNameQueue,DatahubClient client,ConcurrentHashMap localCache ) { this .fileNameQueue = fileNameQueue; this.client = client; this.cuCountDownLatch = cuCountDownLatch; this.localCache = localCache; } @Override public void run() { String filePathName = null; AtomicLong upFailedNum = new AtomicLong(); try{ while((filePathName=this.fileNameQueue.poll()) != null){ System.out.println(Thread.currentThread().getName()+"开始读取fileName======================="+filePathName); Date beginDate =new Date(); FileReader fr = null; try { fr = new FileReader(filePathName); } catch (FileNotFoundException e1) { e1.printStackTrace(); continue; } BufferedReader br = new BufferedReader(fr); String line = null; List dataList = new ArrayList (); DateFormat dataDateFormat = new SimpleDateFormat("yyyyMMdd"); String dataDate = dataDateFormat.format(beginDate); //获取通道编号 String shardId=String.valueOf(RandomUtils.nextInt(0,9)); boolean isCanBeRmove = true ; String exceptionMsg = null; try { while((line = br.readLine()) != null ){ if(this.localCache != null ){ if(line.contains("AFN=13;FN=161")){ String timeStr = line.substring(1, 14); methodYQ(timeStr); } } Datas data = new Datas(); data.setContent(line); data.setDataDate(dataDate); dataList.add(data); if(dataList.size() % 1000 == 0){ //添加记录 DataHubUtil.add2DataHub(client, shardId ,dataList); dataList = new ArrayList (); } } //剩余不满1000的数据一次提交 if(dataList.size() != 0){ //添加记录 DataHubUtil.add2DataHub(client, shardId ,dataList); } } catch (Exception e) { exceptionMsg = e.getMessage(); String threadName = Thread.currentThread().getName(); System.out.println(threadName+"捕获到dataHub上传异常==="+exceptionMsg); isCanBeRmove = false ; // DataHubUtil.closeFileStream(br); e.printStackTrace(); }finally{ DataHubUtil.closeFileStream(br); } //删除已经读过的日志文件 Date endDate =new Date(); System.out.println(Thread.currentThread().getName()+"已读取文件名称:"+filePathName+" ; 开始时间:"+beginDate+"结束时间:"+endDate+".................."); String pathName = filePathName; String[] arg = {"/bin/sh","-c","rm "+pathName}; if(isCanBeRmove && exceptionMsg == null){ try { Runtime.getRuntime().exec(arg); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"删除文件:"+pathName); }else{ long thisNum = upFailedNum.getAndIncrement();//防止重名 System.out.println(Thread.currentThread().getName()+"未删除文件:"+pathName+";已更名为:"+pathName+"_failed"+thisNum); if("retryFailure".equals(exceptionMsg)){ String[] args = {"/bin/sh","-c","mv "+pathName+" "+ pathName+"_failed"+thisNum}; try { Runtime.getRuntime().exec(args); } catch (IOException e) { e.printStackTrace(); } } } } }finally{ //将本类的计数转存到主程序的缓存中、 methodYQ2(hashMap); cuCountDownLatch.countDown(); } } /** * 将本类的计数转存到主程序的缓存中、 * @param map * @param timeStr */ private void methodYQ2(Map hashMap2) { if(localCache!=null){ Set keySet = hashMap2.keySet(); for (String key : keySet) { if(localCache.containsKey(key)){ Integer ii = hashMap2.get(key); localCache.put(key, ii+localCache.get(key)); }else{ localCache.put(key, hashMap2.get(key)); } } } } /** * 判断时间字符串在指定的时间断内 * @return */ private boolean strDog(String str){ String s = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); String start = " 00:30:00"; String end = " 01:30:00"; int a = (s+start).compareTo(str); int b = str.compareTo(s+end); return (str.contains(s) && a<0 && b<0) ? true : false; } /** * 根据时间点存Map * @return */ private void methodYQ(String timeStr){ if(hashMap!=null){ if(hashMap.containsKey(timeStr)){ Integer i = hashMap.get(timeStr); hashMap.put(timeStr, i+1); }else{ hashMap.put(timeStr,1); } } }}
程序编写过程中的感想:
一开始觉得datahub是大数据环境,尤其是阿里开发的,所以对他的吞吐量和性能严重高估,觉得datahub多大的并发都能扛住,实时证明,实际使用过程中会有很多不确定性,然后datahub也没有想想中的那么能抗,我这个程序当时部署了20个,且每个程序开16个线程,运行了一段时间没有发现什么问题,程序一直不报错,结果统计数据库中日志丢了大量数据,一开始怀疑是程序读取日志部分有错,最后通过增加计数器发现,固定时间段内程序从log中读取的报文数量是对的,但入datahub之后就不对了,最后通过查询API发现,putRecords有一个失败数量返回值,这个一开始没有发现,最后才知道 ,这个方法执行失败是不报错的,而是返回当前执行失败的数量,于是我加上了重试机制!程序再次发布之后,通过日志发现,基本上百分之80以上的执行过程都会走到重试机制---后续通过配置shardid以及datahub的参数,情况有所好转。
转载地址:http://upcii.baihongyu.com/