且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

简单的 Kafka 消费者没有收到消息

更新时间:2022-11-05 22:00:00

首先使用以下方法检查所有可用的组:

First check what all the groups are available by using :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

然后使用下面的 cmd 检查您的主题属于哪个组:

Then check for which group your topic belongs by using below cmd :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

一旦您找到您的主题和关联的组名(如果它不属于默认组,只需将 group.id 替换为您的组)然后尝试使用以下道具并让我知道它是否有效:

Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }