且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

将 Spring Cloud Stream 与 Kafka 一起使用时,如何正常关闭应用程序?

更新时间:2023-01-09 23:34:13

EDIT

不再需要做这个反射 hack;只需将 ListenerContainerCustomizer @Bean 添加到应用程序上下文即可.参见 此处.

It is no longer necessary to do this reflection hack; just add a ListenerContainerCustomizer @Bean to the application context. See here.

EDIT_END

不再支持spring-kafka 1.1.x;你应该使用 1.3.9 和引导 1.5.x.

spring-kafka 1.1.x is no longer supported; you should be using 1.3.9 with boot 1.5.x.

当前的 Boot 1.5.x 版本是 1.5.21.

The current Boot 1.5.x version is 1.5.21.

您应该立即升级.

但是,所有这些项目都有更新的版本.

However, there a much newer versions of all of these projects.

Spring Cloud Stream 不使用该工厂或引导属性来创建其容器;它没有公开在容器上配置该属性的机制.

Spring Cloud Stream doesn't use that factory, or the boot properties, to create its containers; it doesn't expose a mechanism to configure that property on the container.

Spring Cloud Stream 2.1 添加了 ListenerContainerCustomizer,它允许您通过设置任何属性来自定义绑定容器.

Spring Cloud Stream 2.1 added the ListenerContainerCustomizer which allows you to customize the binding container by setting any properties on it.

我建议您升级到 Boot 2.1.6 和 Spring Cloud Stream Germantown (2.2.0).

I suggest you upgrade to Boot 2.1.6 and Spring Cloud Stream Germantown (2.2.0).

编辑

这有点麻烦,但它应该可以工作,直到您可以升级到更新的流版本...

This is a bit of a hack, but it should work until you can upgrade to a newer stream release...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {

    public static void main(String[] args) {
        SpringApplication.run(So56883620Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        this.latch.countDown();
        System.out.println(in);
        Thread.sleep(6_000);
        System.out.println("exiting");
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
            // wait for listener to start
            this.latch.await(10, TimeUnit.SECONDS);
            System.out.println("Shutting down");
        };
    }

    @Bean
    public SmartLifecycle bindingFixer(BindingService bindingService) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MAX_VALUE;
            }

            @Override
            public void stop() {
                // no op
            }

            @Override
            public void start() {
                @SuppressWarnings("unchecked")
                Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
                        .getPropertyValue("consumerBindings");
                @SuppressWarnings("unchecked")
                Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
                ((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
                        .getPropertyValue("lifecycle.messageListenerContainer"))
                                .getContainerProperties().setShutdownTimeout(30_000L);
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
                callback.run();
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };
    }

}