且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Kafka Java Consumer已经关闭

更新时间:2022-03-18 22:28:12

之所以发生这种情况,是因为您要在无限循环结束时关闭使用者,因此当它轮询第二次关闭使用者时.为了解决眼前的问题,我将整个while(true)循环包装在try-catch中,并在catch或finally块中处理使用者关闭.

This is happening because you are closing the consumer at the end of your infinite loop so when it polls a second time the consumer has been closed. To handle the immediate problem I'd wrap the entire while(true) loop in a try-catch and handle the consumer close in the catch or finally block.

但是,如果Kafka用户未正确处理不同的关闭信号,则可能会丢失数据.我建议您查看Confluent的示例以实现正常的消费者关机

However if different shutdown signals aren't handled carefully with a Kafka consumer you run the risk of losing data. I'd recommend looking at Confluent's example for graceful consumer shutdown here. In your case since you're running in the main thread it'd look something like this ...

public static void main(String[] args) {
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");

    Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    Runtime.getRuntime().addShutdownHook(new Thread()
    {
      public void run() {
        consumer.wakeup();
      }
    });

    try {
      while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
          i++;
          ConsumerRecord<String, String> consumerRecord = iterator.next();
          String key = consumerRecord.key();
          String value = consumerRecord.value();
          if (key == "exit" || value == "exit")
            break;
          System.out.println("Key=" + key + "\tValue=" + value);
        }
        System.out.println("Messages processed = " + Integer.toString(i));
      }
    } catch (WakeupExection e) {
      // Do Nothing
    } finally {
      consumer.close();
    }
  }
}

基本上运行consumer.wakeup()是使用者中唯一的线程安全方法,因此它是唯一可以在Java的shutdown钩子内部运行的方法.由于调用唤醒时消费者未处于睡眠状态,因此它将触发唤醒执行,该唤醒失败后将正常关闭用户.

basically running consumer.wakeup() is the only threadsafe method in the consumer so it's the only one than can be ran inside of Java's shutdown hook. Since the consumer is not asleep when wakeup is called it trips the wakeupexection which falls through to gracefully shutting down the consumer.