
且构网 - 分享程序员编程开发的那些事

Kafka Java Consumer已经关闭

更新时间:2022-03-18 22:28:12


This is happening because you are closing the consumer at the end of your infinite loop so when it polls a second time the consumer has been closed. To handle the immediate problem I'd wrap the entire while(true) loop in a try-catch and handle the consumer close in the catch or finally block.


However if different shutdown signals aren't handled carefully with a Kafka consumer you run the risk of losing data. I'd recommend looking at Confluent's example for graceful consumer shutdown here. In your case since you're running in the main thread it'd look something like this ...

public static void main(String[] args) {
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");

    Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);

    Runtime.getRuntime().addShutdownHook(new Thread()
      public void run() {

    try {
      while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
          ConsumerRecord<String, String> consumerRecord = iterator.next();
          String key = consumerRecord.key();
          String value = consumerRecord.value();
          if (key == "exit" || value == "exit")
          System.out.println("Key=" + key + "\tValue=" + value);
        System.out.println("Messages processed = " + Integer.toString(i));
    } catch (WakeupExection e) {
      // Do Nothing
    } finally {


basically running consumer.wakeup() is the only threadsafe method in the consumer so it's the only one than can be ran inside of Java's shutdown hook. Since the consumer is not asleep when wakeup is called it trips the wakeupexection which falls through to gracefully shutting down the consumer.