接到告警,业务的Kafka出口流量异常,这个kafka一直是订单类在用,平时插入的数据不会达到这个规模。其次,入口带宽很少,出口带宽这么多,掐指一算,大概率是消费的服务出现问题了。一般入口带宽和出口带宽 不是消费组特别多的情况下,两个的带宽相差不会太悬殊。
查找凶手
用 iftop
看了一下大致流量情况
发现主要流量流出到3.84和3.85这两台机
用 tcpdump -i eth0 host <kafka ip> and host <xxx.xxx.3.84> -w test.cap
抓取了几秒这两个主机相关的包
基本确定了流量使用多的端口,主要是 3.141:9093(Kafka) 发送给 3.84:15630 的流量比较多。
看了下(3.84:15630 -> 3.141:9093 )的内容 基本可以确定是哪个topic和消费组。
不抓包的话 用netstat来简单过滤下,如果两个主机间建立的连接比较少的话,基本也能定位出是哪个端口,如下
1 2 3
| [root@xxx-xxx-3.141 ~] tcp 0 0 xxx.xxx.3.141:9093 xxx.xxx.3.84:15556 ESTABLISHED keepalive (4.24/0/0) tcp 0 4128896 xxx.xxx.3.141:9093 xxx.xxx.3.84:15630 ESTABLISHED on (0.20/0/0)
|
接下来就需要找出3.84:15630是哪个服务监听的端口。
到3.84这台机执行 lsof -i:端口 得到进程ID
然后用 ps aux | grep 或者 lsof -p 找出具体进程信息。
1 2 3 4 5 6 7
| [root@xxx.xxx.3.84 ~]#lsof -i:15630 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 3047564 java 442u IPv4 77454498 0t0 TCP xxx.xxx.3.84:15630->xxx.xxx.3.141:copycat (ESTABLISHED)
[root@xxx.xxx.3.84 ~]#ps aux|grep 3047564 root 2518948 0.0 0.0 112712 956 pts/2 S+ 15:14 0:00 grep --color=auto 3047564 java 3047564 99.7 28.5 9016636 2212048 ? Ssl Jan14 8903:50 /usr/java/jdk1.8.0_202/bin/java <手动打码进程名>
|
确定了进程后,找到维护该服务的开发者,咨询了下,该服务有哪些消费组。和前面抓包的数据印证。
在kafka center观察了下,该消费组有少量积压的数据。其实topic几秒新增数据在个位数,但是发送的数据如此之多,基本断定消费者拿了消息,可能处理出现了问题,并没有返回ack。
拿着消费组名字去代码搜了搜相关消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public void consumer(@Payload String msg , Acknowledgment ack) throws IOException { Optional<String> kafkaMessage = Optional.ofNullable(msg); if(kafkaMessage.isPresent()){ String message = kafkaMessage.get(); if(Optional.ofNullable(message).isPresent()){ JsonNode jsonNode = objectMapper.readTree(message); String businessMark = jsonNode.get("business_mark").asText(); if(!DUSINESS_MARK.equals(businessMark)){ ack.acknowledge(); return; } String action = jsonNode.get("action").asText(); if(!DiscussDataEnum.DISCUSS_DATA_ADD.getType().equals(action) && !DiscussDataEnum.DISCUSS_DATA_EDIT.getType().equals(action)){ ack.acknowledge(); return; } DiscussOriginalInfo discussOriginalInfo = JacksonUtil.getObjectMapper().readValue(message, new TypeReference<DiscussOriginalInfo>(){}); if(null == discussOriginalInfo){ return; } log.info("===========> discuss data message kafkaMessage = {}", message); DiscussContentInfo discussContentInfo = JacksonUtil.getObjectMapper().readValue(discussOriginalInfo.getContent(), new TypeReference<DiscussContentInfo>(){}); discussOriginalInfo.setDiscussContentInfo(discussContentInfo); aliDiscussService.AliDiscussProccess(discussOriginalInfo); } } ack.acknowledge(); }
|
简单看了下,有序列化和反序列相关的代码,这部分是相当容易出问题的,如果数据不符合规范,出问题就常见的。
而这里没有异常捕捉和数据合规校验相关代码,有些判断不符合预期,return的时候也没返回ack。
基本断定是因为数据处理异常,没有返回ack,导致消息重复被拉取消费,不断循环。
和相关开发同志反馈,最后加入相关异常捕捉代码,问题得以解决。
从下图可以看出,4点左右上线新版本后,断层式下降。
但是也发现另外一个问题,貌似连接数有点高。平时大家都是创建了线程池来读写kafka,而据我所知连接这个kafka的程序并没有很多,怎么有高达9k左右的连接数?
带着介个问题,于是又有了另外一个故障处理,请看下一篇 《Kafka连接数过多问题排查》