更新时间:2022-11-05 22:00:00
首先使用以下方法检查所有可用的组:
First check what all the groups are available by using :
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
然后使用下面的 cmd 检查您的主题属于哪个组:
Then check for which group your topic belongs by using below cmd :
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe
一旦您找到您的主题和关联的组名(如果它不属于默认组,只需将 group.id 替换为您的组)然后尝试使用以下道具并让我知道它是否有效:
Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group"); // default topic name
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName)); // replace you topic name
//print the topic name
java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
System.out.println("list of topic size :" + listTopics.size());
for(String topic : listTopics.keySet()){
System.out.println("topic name :"+topic);
}