更新时间:2022-06-19 04:44:55
kafka 2.11 attempt to heart beat failed since the group is rebalancing
生产环境遇到kafka 2.11 重平衡问题,记录
为了解决问题,先还原此报错
https://kafka.apache.org/downloads 2.1.1版本
下载后解压
修改 config目录下 zookeeper.properties dataDir 指定zk数据存放目录(默认是linux目录结构)
修改 config目录下 server.properties log log.dirs 指定kafka日志存放目录(默认是linux目录结构)
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
zk端口默认2181
.\bin\windows\kafka-server-start.bat config\server.properties
kafka默认端口 9092
使用kafka_toole工具连接测试:
测试成功,单节点搭建完成
开发程序,模拟生产者和消费者
# 引入jar包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring.kafka.producer.bootstrap-servers=localhost:9092 # 配置kafka地址 spring.kafka.consumer.max-poll-records=1 #每次拉取一条数据 方便测试 spring.kafka.consumer.heartbeat-interval=100 # 心跳时间100ms
使用KafkaTemplate
controller和service代码省略
package com.example.kafka_test.service.impl; import com.example.kafka_test.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import static com.example.kafka_test.KafkaConstant.TOPIC_NAME; /** * @Author: Zy * @Date: 2021/11/25 10:00 */ public class ProducerServiceImpl implements ProducerService { private KafkaTemplate<String, String> kafkaTemplate; public void sendMsg(String msg) { kafkaTemplate.send(TOPIC_NAME, msg); } }
package com.example.kafka_test.service.impl; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import static com.example.kafka_test.KafkaConstant.CONSUMER_GROUP; import static com.example.kafka_test.KafkaConstant.TOPIC_NAME; /** * @Author: Zy * @Date: 2021/11/25 10:13 */ public class ConsumerServiceImpl { topics = TOPIC_NAME, groupId = CONSUMER_GROUP) ( public void consumerMsg(String msg) { try { # 休眠 防止消费速度快 无法观察日志 Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } log.info(msg); } }
使用postman调用生产者造数据
正常启动,且开始消费
同样的分区,同样的消费者组
启动成功后,发现第一个消费者大量输出重平衡日志,且重平衡期间数据并没有消费
消费者的数量变动会触发Rebalance 重平衡期间数据不消费
Attempt to heart beat failed since the group is rebalancing
修改后像测试1一样重启,发现大量报错
还原报错成功,总结一下
Attempt to heart beat failed since the group is rebalancing
先分析一下这句话,发送心跳请求失败,消费者组正在重平衡
也就是说触发这个问题的条件有两个:
在kafka 0.11版本之前,心跳请求是跟poll()请求一起发送的,即拉取一次数据发送一次心跳
在kafka 0.11版本之后,心跳请求是单独的线程,由 spring.kafka.consumer.heartbeat-interval 参数控制心跳请求的间隔时间
触发重平衡的情况如下:
经过以上分析,可以得到结果;
由于拉取数据消费过慢,两次poll之间的时间超过了session.timeout.ms,被认为此消费者已死亡,触发了rebalance,而在rebalance的过程中,发送心跳请求导致的报错.
其中***的办法就是2 3
最重要的就是poll到的数据要在session.timeout.ms时间内处理完.