问题最近想试试 kafka 的相关操作,在 spring-kafka 3.0.1 版本下。发现当我指定消费者去消费指定分区时,kafka 无法检测到该消费者(消费组内无消费者),但是却可以正常消费。 如果不指定,而是由 kafka 自动平衡消费者的话,就可以检测到
// 该代码正常
@KafkaListener(id = "consumer-all", clientIdPrefix = "consumer-all", topics = "topic1", groupId = "mygroup", concurrency = "1")
public void listenAllOne(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
System.out.println("这是第一个消费者-----"+consumerRecord.toString());
ack.acknowledge();
}
// 该消费者可以正常消费,但是无法被 kafka 检测到
@KafkaListener(id="consumer-1", clientIdPrefix = "consumer-1", topicPartitions = {@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "0,1", initialOffset = "0"))}, groupId = "mygroup1", concurrency = "1")
public void listenTop1(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
System.out.println("这是第一个分区消费者-----"+consumerRecord.toString());
ack.acknowledge();
}
@KafkaListener(id="consumer-2", clientIdPrefix = "consumer-2", topicPartitions = {@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "2,3", initialOffset = "0"))}, groupId = "mygroup1", concurrency = "1")
public void listenTop2(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
System.out.println("这是第二个分区消费者-----"+consumerRecord.toString());
ack.acknowledge();
}
CLI查询的方式也是一样的结果,如图
1
wangxin3 2023-01-06 16:13:31 +08:00
web 监控界面是第三方的吧,可能没有适配?
|
2
wangxin3 2023-01-06 16:22:01 +08:00
可以试试 kafka 的官方命令行来查询
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 你的 groupId |
3
loveaeen OP 这个我也试了。也是没有 active members
![]( https://hexo-1302895213.cos.ap-shanghai.myqcloud.com/blog/202301061639200.png) |
5
novolunt 2023-01-06 16:44:03 +08:00
https://www.kafkatool.com/ GUI 客户端看看
|