记一次生产Kafka出口流量暴涨问题

流量暴涨

接到告警,业务的Kafka出口流量异常,这个kafka一直是订单类在用,平时插入的数据不会达到这个规模。其次,入口带宽很少,出口带宽这么多,掐指一算,大概率是消费的服务出现问题了。一般入口带宽和出口带宽 不是消费组特别多的情况下,两个的带宽相差不会太悬殊。

查找凶手

iftop 看了一下大致流量情况

image-20220128145022454

发现主要流量流出到3.843.85这两台机

tcpdump -i eth0 host <kafka ip> and host <xxx.xxx.3.84> -w test.cap 抓取了几秒这两个主机相关的包

基本确定了流量使用多的端口,主要是 3.141:9093(Kafka) 发送给 3.84:15630 的流量比较多。

tcp流

看了下(3.84:15630 -> 3.141:9093 )的内容 基本可以确定是哪个topic和消费组。

不抓包的话 用netstat来简单过滤下,如果两个主机间建立的连接比较少的话,基本也能定位出是哪个端口,如下

1
2
3
[root@xxx-xxx-3.141 ~]#netstat -ano|grep xxx.xxx.3.84
tcp 0 0 xxx.xxx.3.141:9093 xxx.xxx.3.84:15556 ESTABLISHED keepalive (4.24/0/0)
tcp 0 4128896 xxx.xxx.3.141:9093 xxx.xxx.3.84:15630 ESTABLISHED on (0.20/0/0)

接下来就需要找出3.84:15630是哪个服务监听的端口。

到3.84这台机执行 lsof -i:端口 得到进程ID

然后用 ps aux | grep 或者 lsof -p 找出具体进程信息。

1
2
3
4
5
6
7
[root@xxx.xxx.3.84 ~]#lsof -i:15630
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 3047564 java 442u IPv4 77454498 0t0 TCP xxx.xxx.3.84:15630->xxx.xxx.3.141:copycat (ESTABLISHED)

[root@xxx.xxx.3.84 ~]#ps aux|grep 3047564
root 2518948 0.0 0.0 112712 956 pts/2 S+ 15:14 0:00 grep --color=auto 3047564
java 3047564 99.7 28.5 9016636 2212048 ? Ssl Jan14 8903:50 /usr/java/jdk1.8.0_202/bin/java <手动打码进程名>

确定了进程后,找到维护该服务的开发者,咨询了下,该服务有哪些消费组。和前面抓包的数据印证。

在kafka center观察了下,该消费组有少量积压的数据。其实topic几秒新增数据在个位数,但是发送的数据如此之多,基本断定消费者拿了消息,可能处理出现了问题,并没有返回ack。

拿着消费组名字去代码搜了搜相关消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void consumer(@Payload String msg , Acknowledgment ack) throws IOException {
Optional<String> kafkaMessage = Optional.ofNullable(msg);
if(kafkaMessage.isPresent()){
String message = kafkaMessage.get();
if(Optional.ofNullable(message).isPresent()){
//开始判断业务内容数据
//判断业务标识
JsonNode jsonNode = objectMapper.readTree(message);
String businessMark = jsonNode.get("business_mark").asText();
if(!DUSINESS_MARK.equals(businessMark)){
ack.acknowledge();
return;
}
//判断操作类型
String action = jsonNode.get("action").asText();
if(!DiscussDataEnum.DISCUSS_DATA_ADD.getType().equals(action) && !DiscussDataEnum.DISCUSS_DATA_EDIT.getType().equals(action)){
ack.acknowledge();
return;
}
//开始序列化对象kafka原始报文
DiscussOriginalInfo discussOriginalInfo = JacksonUtil.getObjectMapper().readValue(message, new TypeReference<DiscussOriginalInfo>(){});
if(null == discussOriginalInfo){
return;
}
log.info("===========> discuss data message kafkaMessage = {}", message);
//开始序列化评论内容报文
DiscussContentInfo discussContentInfo = JacksonUtil.getObjectMapper().readValue(discussOriginalInfo.getContent(), new TypeReference<DiscussContentInfo>(){});
//将数据字符串转化形成对象
discussOriginalInfo.setDiscussContentInfo(discussContentInfo);
//开始进入对队列处理器
aliDiscussService.AliDiscussProccess(discussOriginalInfo);
}
}
ack.acknowledge();
}

简单看了下,有序列化和反序列相关的代码,这部分是相当容易出问题的,如果数据不符合规范,出问题就常见的。

而这里没有异常捕捉和数据合规校验相关代码,有些判断不符合预期,return的时候也没返回ack。

基本断定是因为数据处理异常,没有返回ack,导致消息重复被拉取消费,不断循环。

和相关开发同志反馈,最后加入相关异常捕捉代码,问题得以解决。

从下图可以看出,4点左右上线新版本后,断层式下降。

网卡出口流量

但是也发现另外一个问题,貌似连接数有点高。平时大家都是创建了线程池来读写kafka,而据我所知连接这个kafka的程序并没有很多,怎么有高达9k左右的连接数?

tcp连接数

带着介个问题,于是又有了另外一个故障处理,请看下一篇 《Kafka连接数过多问题排查》

关注公众号 尹安灿