Java Kafka 消费积压监控
后端代码:
Monitor.java代码:
package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.DecimalFormat;import java.text.SimpleDateFormat;import java.util.*;/** * kafka消费监控 * * @author suxiang */public class Monitor { private static final Logger log = LoggerFactory.getLogger(Monitor.class); private String servers; private String topic; private String groupId; private long lastTime; private long lastTotalLag = 0L; private long lastLogSize = 0L; private long lastOffset = 0L; private double lastRatio = 0; private long speedLogSize = 0L; private long speedOffset = 0L; private String time; private List<ConsumerInfo> list; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String getTime() { return time; } public void setTime(String time) { this.time = time; } public long getLastTotalLag() { return lastTotalLag; } public double getLastRatio() { return lastRatio; } public String getTopic() { return topic; } public String getGroupId() { return groupId; } public long getSpeedLogSize() { return speedLogSize; } public long getSpeedOffset() { return speedOffset; } public List<ConsumerInfo> getList() { return list; } public void setList(List<ConsumerInfo> list) { this.list = list; } private KafkaConsumer<String, String> consumer; private List<TopicPartition> topicPartitionList; private final DecimalFormat decimalFormat = new DecimalFormat("0.00"); public Monitor(String servers, String topic, String groupId) { this.servers = servers; this.topic = topic; this.groupId = groupId; this.list = new ArrayList<>(); //消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumer = new KafkaConsumer<String, String>(properties); //查询 topic partitions topicPartitionList = new ArrayList<>(); List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfoList) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitionList.add(topicPartition); } } public void monitor(boolean addToList) { try { long startTime = System.currentTimeMillis(); //查询 log size Map<Integer, Long> endOffsetMap = new HashMap<>(); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList); for (TopicPartition partitionInfo : endOffsets.keySet()) { endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); } //查询消费 offset Map<Integer, Long> commitOffsetMap = new HashMap<>(); for (TopicPartition topicAndPartition : topicPartitionList) { OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); } long endTime = System.currentTimeMillis(); log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); startTime = System.currentTimeMillis(); //累加lag long totalLag = 0L; long logSize = 0L; long offset = 0L; if (endOffsetMap.size() == commitOffsetMap.size()) { for (Integer partition : endOffsetMap.keySet()) { long endOffset = endOffsetMap.get(partition); long commitOffset = commitOffsetMap.get(partition); long diffOffset = endOffset - commitOffset; totalLag += diffOffset; logSize += endOffset; offset += commitOffset; } } else { log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost"); } log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag); if (lastTime > 0) { if (System.currentTimeMillis() - lastTime > 0) { speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0)); speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0)); } if (speedLogSize > 0) { String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0)); lastRatio = Double.parseDouble(strRatio); log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%"); } } lastTime = System.currentTimeMillis(); lastTotalLag = totalLag; lastLogSize = logSize; lastOffset = offset; endTime = System.currentTimeMillis(); log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); if (addToList) { this.setTime(simpleDateFormat.format(new Date())); this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime())); if (this.list.size() > 500) { this.list.remove(0); } } } catch (Exception e) { log.error("Monitor error", e); } }}
View Code
MonitorService.java代码:
package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.*;@Servicepublic class MonitorService { private static final Logger log = LoggerFactory.getLogger(MonitorService.class); @Value("${kafka.consumer.servers}") private String servers; private Monitor monitor; private List<Monitor> monitorList; @PostConstruct private void Init() { monitorList = new ArrayList<>(); monitorList.add(new Monitor(servers, "wifiData", "wifi-kafka-hbase")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "yisa20210521000001")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check-19")); monitorList.add(new Monitor(servers, "motorVehicle", "unifiedstorage-downloader")); monitorList.add(new Monitor(servers, "motorVehicle", "full-vehicle-data-storage-kafka2ch")); monitorList.add(new Monitor(servers, "motorVehicle", "vehicle_store")); monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-luyang")); monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-yaohai")); monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-baohe")); monitorList.add(new Monitor(servers, "peopleFace", "kafka-filter-check-19")); } public void monitorOnce(boolean addToList) { for (Monitor monitor : monitorList) { monitor.monitor(addToList); } } public List<ConsumerInfo> getConsumerList() { List<ConsumerInfo> list = new ArrayList<>(); for (Monitor monitor : monitorList) { list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime())); } return list; } public List<ConsumerInfo> getDetails(String topic, String groupId) { for (Monitor monitor : monitorList) { if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) { return monitor.getList(); } } return new ArrayList<>(); }}
View Code
MonitorConfig.java代码:
package com.suncreate.kafkaConsumerMonitor.task;import com.suncreate.kafkaConsumerMonitor.service.MonitorService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.SchedulingConfigurer;import org.springframework.scheduling.config.ScheduledTaskRegistrar;import org.springframework.scheduling.support.CronTrigger;import java.text.SimpleDateFormat;@Configuration@EnableSchedulingpublic class MonitorConfig implements SchedulingConfigurer { private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class); private String cronExpression = "0 */3 * * * ?"; //private String cronExpression = "*/20 * * * * ?"; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Autowired private MonitorService monitorService; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addTriggerTask(() -> { monitorService.monitorOnce(true); }, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext)); }}
View Code
MonitorController.java代码:
package com.suncreate.kafkaConsumerMonitor.controller;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;import com.suncreate.kafkaConsumerMonitor.model.LayuiData;import com.suncreate.kafkaConsumerMonitor.service.MonitorService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController@RequestMapping("/monitor")public class MonitorController { @Autowired private MonitorService monitorService; @GetMapping("/getConsumers") public LayuiData getConsumers() { List<ConsumerInfo> list = monitorService.getConsumerList(); LayuiData data = new LayuiData(list); return data; } @GetMapping("/monitorOnce") public void monitorOnce() { monitorService.monitorOnce(false); } @GetMapping("/getDetails") public LayuiData getDetails(String topic, String groupId) { List<ConsumerInfo> list = monitorService.getDetails(topic, groupId); LayuiData data = new LayuiData(list); return data; }}
View Code
pom.
<??><project ="http://maven.apache.org/POM/4.0.0" ="http://www.w3.org/2001/ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.suncreate</groupId> <artifactId>kafka-consumer-monitor</artifactId> <version>1.0</version> <name>kafka-consumer-monitor</name> <description>Kafka消费积压监控预警</description> <properties> <java.version>1.8</java.version> <elasticsearch.version>6.1.4</elasticsearch.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.1.4</version> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>11.1.0.7.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build></project>
View Code
前端使用了 Layui 和 ECharts 展示表格和图表
index.css代码:
.div-title { font-size: 18px; margin-top: 10px; margin-left: 10px;}.div-right { text-align: right;}.span-red { color: #ff0000;}
View Code
index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):
<!DOCTYPE html><html lang="zh"><head> <meta charset="UTF-8"> <title>Title</title> <link rel="stylesheet" href="css/index.css"> <link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"> <script type="text/javascript" src="js/jquery-1.7.1.js"></script> <script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script></head><body><div class="div-title">Kafka 监控 <button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">刷新</button></div><table class="layui-hide" id="myTable"></table><script type="text/javascript"> var myTable; layui.use('table', function () { var table = layui.table; myTable = table.render({ elem: '#myTable', url: '/home/monitor/getConsumers', cellMinWidth: 80, //全局定义常规单元格的最小宽度 cols: [[ {field: 'topic', width: 300, title: 'topic', sort: true}, {field: 'groupId', width: 300, title: 'groupId'}, { field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) { if (d.delayDay * 24 > 2) { return '<div ><span >' + d.totalLag + '</span></div>' } else { return '<div ><span>' + d.totalLag + '</span></div>' } } }, { field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) { return '<div >' + d.speedLogSize + '</div>' } }, { field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: fu......原文转载:http://www.shaoqun.com/a/839144.html
跨境电商:https://www.ikjzd.com/
DMM:https://www.ikjzd.com/w/2026
netporter:https://www.ikjzd.com/w/2132
淘粉吧怎么返利:https://www.ikjzd.com/w/1725
JavaKafka消费积压监控后端代码:Monitor.java代码:packagecom.suncreate.kafkaConsumerMonitor.service;importcom.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;importorg.apache.kafka.clients.consumer.ConsumerConfig;
薇美铺:https://www.ikjzd.com/w/2312
大卖纷纷布局、价值$1180亿的拉美电商市场,对你来说还有机会吗?:https://www.ikjzd.com/articles/106680
如何快速提升Youtube权重?收好这4招!:https://www.ikjzd.com/articles/106682
shopee店铺如何免费推广?:https://www.ikjzd.com/articles/106742
姐夫放狠话:众卖家别慌,朕的江山至少还能再稳六十年!:https://www.ikjzd.com/articles/106744
口述:保姆给孩子喂奶 老公伸手摸胸:http://lady.shaoqun.com/a/15161.html
换妻 老婆还是别人的好?:http://lady.shaoqun.com/m/a/107395.html
姐夫垂涎小姨子美色 趁其喂奶不备之时兽性大发实施强奸:http://lady.shaoqun.com/m/a/77085.html
985大一新生性别比堪忧,校长喊脱单,男生家长坐不住:http://lady.shaoqun.com/a/394610.html
什么样的女人最好泡,不用说自己来:http://lady.shaoqun.com/a/394611.html
大学情侣中出现"怀孕"现象的原因大多是这些原因:http://lady.shaoqun.com/a/394612.html
毒恋班:大学恋爱班应该教什么?|观点:http://lady.shaoqun.com/a/394613.html
No comments:
Post a Comment