2021-07-01

Java Kafka 消费积压监控

Java Kafka 消费积压监控

后端代码:

Monitor.java代码:

package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.DecimalFormat;import java.text.SimpleDateFormat;import java.util.*;/** * kafka消费监控 * * @author suxiang */public class Monitor { private static final Logger log = LoggerFactory.getLogger(Monitor.class); private String servers; private String topic; private String groupId; private long lastTime; private long lastTotalLag = 0L; private long lastLogSize = 0L; private long lastOffset = 0L; private double lastRatio = 0; private long speedLogSize = 0L; private long speedOffset = 0L; private String time; private List<ConsumerInfo> list; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String getTime() {  return time; } public void setTime(String time) {  this.time = time; } public long getLastTotalLag() {  return lastTotalLag; } public double getLastRatio() {  return lastRatio; } public String getTopic() {  return topic; } public String getGroupId() {  return groupId; } public long getSpeedLogSize() {  return speedLogSize; } public long getSpeedOffset() {  return speedOffset; } public List<ConsumerInfo> getList() {  return list; } public void setList(List<ConsumerInfo> list) {  this.list = list; } private KafkaConsumer<String, String> consumer; private List<TopicPartition> topicPartitionList; private final DecimalFormat decimalFormat = new DecimalFormat("0.00"); public Monitor(String servers, String topic, String groupId) {  this.servers = servers;  this.topic = topic;  this.groupId = groupId;  this.list = new ArrayList<>();  //消费者  Properties properties = new Properties();  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);  properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  consumer = new KafkaConsumer<String, String>(properties);  //查询 topic partitions  topicPartitionList = new ArrayList<>();  List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);  for (PartitionInfo partitionInfo : partitionInfoList) {   TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());   topicPartitionList.add(topicPartition);  } } public void monitor(boolean addToList) {  try {   long startTime = System.currentTimeMillis();   //查询 log size   Map<Integer, Long> endOffsetMap = new HashMap<>();   Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList);   for (TopicPartition partitionInfo : endOffsets.keySet()) {    endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));   }   //查询消费 offset   Map<Integer, Long> commitOffsetMap = new HashMap<>();   for (TopicPartition topicAndPartition : topicPartitionList) {    OffsetAndMetadata committed = consumer.committed(topicAndPartition);    commitOffsetMap.put(topicAndPartition.partition(), committed.offset());   }   long endTime = System.currentTimeMillis();   log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");   startTime = System.currentTimeMillis();   //累加lag   long totalLag = 0L;   long logSize = 0L;   long offset = 0L;   if (endOffsetMap.size() == commitOffsetMap.size()) {    for (Integer partition : endOffsetMap.keySet()) {     long endOffset = endOffsetMap.get(partition);     long commitOffset = commitOffsetMap.get(partition);     long diffOffset = endOffset - commitOffset;     totalLag += diffOffset;     logSize += endOffset;     offset += commitOffset;    }   } else {    log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost");   }   log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag);   if (lastTime > 0) {    if (System.currentTimeMillis() - lastTime > 0) {     speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));     speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));    }    if (speedLogSize > 0) {     String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));     lastRatio = Double.parseDouble(strRatio);     log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%");    }   }   lastTime = System.currentTimeMillis();   lastTotalLag = totalLag;   lastLogSize = logSize;   lastOffset = offset;   endTime = System.currentTimeMillis();   log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");   if (addToList) {    this.setTime(simpleDateFormat.format(new Date()));    this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));    if (this.list.size() > 500) {     this.list.remove(0);    }   }  } catch (Exception e) {   log.error("Monitor error", e);  } }}

View Code

MonitorService.java代码:

package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.*;@Servicepublic class MonitorService { private static final Logger log = LoggerFactory.getLogger(MonitorService.class); @Value("${kafka.consumer.servers}") private String servers; private Monitor monitor; private List<Monitor> monitorList; @PostConstruct private void Init() {  monitorList = new ArrayList<>();  monitorList.add(new Monitor(servers, "wifiData", "wifi-kafka-hbase"));  monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE"));  monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn"));  monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "yisa20210521000001"));  monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check-19"));  monitorList.add(new Monitor(servers, "motorVehicle", "unifiedstorage-downloader"));  monitorList.add(new Monitor(servers, "motorVehicle", "full-vehicle-data-storage-kafka2ch"));  monitorList.add(new Monitor(servers, "motorVehicle", "vehicle_store"));  monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-luyang"));  monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-yaohai"));  monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-baohe"));  monitorList.add(new Monitor(servers, "peopleFace", "kafka-filter-check-19")); } public void monitorOnce(boolean addToList) {  for (Monitor monitor : monitorList) {   monitor.monitor(addToList);  } } public List<ConsumerInfo> getConsumerList() {  List<ConsumerInfo> list = new ArrayList<>();  for (Monitor monitor : monitorList) {   list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime()));  }  return list; } public List<ConsumerInfo> getDetails(String topic, String groupId) {  for (Monitor monitor : monitorList) {   if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {    return monitor.getList();   }  }  return new ArrayList<>(); }}

View Code

MonitorConfig.java代码:

package com.suncreate.kafkaConsumerMonitor.task;import com.suncreate.kafkaConsumerMonitor.service.MonitorService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.SchedulingConfigurer;import org.springframework.scheduling.config.ScheduledTaskRegistrar;import org.springframework.scheduling.support.CronTrigger;import java.text.SimpleDateFormat;@Configuration@EnableSchedulingpublic class MonitorConfig implements SchedulingConfigurer { private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class); private String cronExpression = "0 */3 * * * ?"; //private String cronExpression = "*/20 * * * * ?"; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Autowired private MonitorService monitorService; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {  taskRegistrar.addTriggerTask(() -> {   monitorService.monitorOnce(true);  }, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext)); }}

View Code

MonitorController.java代码:

package com.suncreate.kafkaConsumerMonitor.controller;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;import com.suncreate.kafkaConsumerMonitor.model.LayuiData;import com.suncreate.kafkaConsumerMonitor.service.MonitorService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController@RequestMapping("/monitor")public class MonitorController { @Autowired private MonitorService monitorService; @GetMapping("/getConsumers") public LayuiData getConsumers() {  List<ConsumerInfo> list = monitorService.getConsumerList();  LayuiData data = new LayuiData(list);  return data; } @GetMapping("/monitorOnce") public void monitorOnce() {  monitorService.monitorOnce(false); } @GetMapping("/getDetails") public LayuiData getDetails(String topic, String groupId) {  List<ConsumerInfo> list = monitorService.getDetails(topic, groupId);  LayuiData data = new LayuiData(list);  return data; }}

View Code

pom.

<??><project ="http://maven.apache.org/POM/4.0.0" ="http://www.w3.org/2001/   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> <modelVersion>4.0.0</modelVersion> <parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>2.1.6.RELEASE</version>  <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.suncreate</groupId> <artifactId>kafka-consumer-monitor</artifactId> <version>1.0</version> <name>kafka-consumer-monitor</name> <description>Kafka消费积压监控预警</description> <properties>  <java.version>1.8</java.version>  <elasticsearch.version>6.1.4</elasticsearch.version> </properties> <dependencies>  <dependency>   <groupId>org.projectlombok</groupId>   <artifactId>lombok</artifactId>   <version>1.18.12</version>  </dependency>  <dependency>   <groupId>com.alibaba</groupId>   <artifactId>fastjson</artifactId>   <version>1.2.54</version>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.springframework.kafka</groupId>   <artifactId>spring-kafka-test</artifactId>   <scope>test</scope>  </dependency>  <dependency>   <groupId>com.google.code.gson</groupId>   <artifactId>gson</artifactId>   <version>2.8.0</version>  </dependency>  <dependency>   <groupId>org.postgresql</groupId>   <artifactId>postgresql</artifactId>   <scope>runtime</scope>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-jdbc</artifactId>  </dependency>  <dependency>   <groupId>org.elasticsearch.client</groupId>   <artifactId>elasticsearch-rest-high-level-client</artifactId>   <version>6.1.4</version>  </dependency>  <dependency>   <groupId>com.oracle</groupId>   <artifactId>ojdbc6</artifactId>   <version>11.1.0.7.0</version>  </dependency>  <dependency>   <groupId>org.apache.kafka</groupId>   <artifactId>kafka_2.11</artifactId>   <version>0.11.0.1</version>  </dependency>  <dependency>   <groupId>org.apache.kafka</groupId>   <artifactId>kafka-clients</artifactId>   <version>0.11.0.1</version>  </dependency> </dependencies> <build>  <plugins>   <plugin>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-maven-plugin</artifactId>   </plugin>   <plugin>    <groupId>org.apache.maven.plugins</groupId>    <artifactId>maven-compiler-plugin</artifactId>    <configuration>     <source>8</source>     <target>8</target>    </configuration>   </plugin>  </plugins> </build></project>

View Code

 

 

前端使用了 Layui 和 ECharts 展示表格和图表

index.css代码:

.div-title { font-size: 18px; margin-top: 10px; margin-left: 10px;}.div-right { text-align: right;}.span-red { color: #ff0000;}

View Code

index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):

<!DOCTYPE html><html lang="zh"><head> <meta charset="UTF-8"> <title>Title</title> <link rel="stylesheet" href="css/index.css"> <link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"> <script type="text/javascript" src="js/jquery-1.7.1.js"></script> <script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script></head><body><div class="div-title">Kafka 监控 <button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">刷新</button></div><table class="layui-hide" id="myTable"></table><script type="text/javascript"> var myTable; layui.use('table', function () {  var table = layui.table;  myTable = table.render({   elem: '#myTable',   url: '/home/monitor/getConsumers',   cellMinWidth: 80, //全局定义常规单元格的最小宽度   cols: [[    {field: 'topic', width: 300, title: 'topic', sort: true},    {field: 'groupId', width: 300, title: 'groupId'},    {     field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) {      if (d.delayDay * 24 > 2) {       return '<div ><span >' + d.totalLag + '</span></div>'      } else {       return '<div ><span>' + d.totalLag + '</span></div>'      }     }    },    {     field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {      return '<div >' + d.speedLogSize + '</div>'     }    },    {     field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: fu......

原文转载:http://www.shaoqun.com/a/839144.html

跨境电商:https://www.ikjzd.com/

DMM:https://www.ikjzd.com/w/2026

netporter:https://www.ikjzd.com/w/2132

淘粉吧怎么返利:https://www.ikjzd.com/w/1725


JavaKafka消费积压监控后端代码:Monitor.java代码:packagecom.suncreate.kafkaConsumerMonitor.service;importcom.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;importorg.apache.kafka.clients.consumer.ConsumerConfig;
薇美铺:https://www.ikjzd.com/w/2312
大卖纷纷布局、价值$1180亿的拉美电商市场,对你来说还有机会吗?:https://www.ikjzd.com/articles/106680
如何快速提升Youtube权重?收好这4招!:https://www.ikjzd.com/articles/106682
shopee店铺如何免费推广?:https://www.ikjzd.com/articles/106742
姐夫放狠话:众卖家别慌,朕的江山至少还能再稳六十年!:https://www.ikjzd.com/articles/106744
口述:保姆给孩子喂奶 老公伸手摸胸:http://lady.shaoqun.com/a/15161.html
换妻 老婆还是别人的好?:http://lady.shaoqun.com/m/a/107395.html
姐夫垂涎小姨子美色 趁其喂奶不备之时兽性大发实施强奸:http://lady.shaoqun.com/m/a/77085.html
985大一新生性别比堪忧,校长喊脱单,男生家长坐不住:http://lady.shaoqun.com/a/394610.html
什么样的女人最好泡,不用说自己来:http://lady.shaoqun.com/a/394611.html
大学情侣中出现"怀孕"现象的原因大多是这些原因:http://lady.shaoqun.com/a/394612.html
毒恋班:大学恋爱班应该教什么?|观点:http://lady.shaoqun.com/a/394613.html

No comments:

Post a Comment