更新时间:2023-01-09 23:34:13
EDIT
不再需要做这个反射 hack;只需将 ListenerContainerCustomizer
@Bean
添加到应用程序上下文即可.参见 此处.
It is no longer necessary to do this reflection hack; just add a ListenerContainerCustomizer
@Bean
to the application context. See here.
EDIT_END
不再支持spring-kafka 1.1.x;你应该使用 1.3.9 和引导 1.5.x.
spring-kafka 1.1.x is no longer supported; you should be using 1.3.9 with boot 1.5.x.
当前的 Boot 1.5.x 版本是 1.5.21.
The current Boot 1.5.x version is 1.5.21.
您应该立即升级.
但是,所有这些项目都有更新的版本.
However, there a much newer versions of all of these projects.
Spring Cloud Stream 不使用该工厂或引导属性来创建其容器;它没有公开在容器上配置该属性的机制.
Spring Cloud Stream doesn't use that factory, or the boot properties, to create its containers; it doesn't expose a mechanism to configure that property on the container.
Spring Cloud Stream 2.1 添加了 ListenerContainerCustomizer
,它允许您通过设置任何属性来自定义绑定容器.
Spring Cloud Stream 2.1 added the ListenerContainerCustomizer
which allows you to customize the binding container by setting any properties on it.
我建议您升级到 Boot 2.1.6 和 Spring Cloud Stream Germantown (2.2.0).
I suggest you upgrade to Boot 2.1.6 and Spring Cloud Stream Germantown (2.2.0).
编辑
这有点麻烦,但它应该可以工作,直到您可以升级到更新的流版本...
This is a bit of a hack, but it should work until you can upgrade to a newer stream release...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {
public static void main(String[] args) {
SpringApplication.run(So56883620Application.class, args).close();
}
private final CountDownLatch latch = new CountDownLatch(1);
@StreamListener(Sink.INPUT)
public void listen(String in) throws InterruptedException {
this.latch.countDown();
System.out.println(in);
Thread.sleep(6_000);
System.out.println("exiting");
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
// wait for listener to start
this.latch.await(10, TimeUnit.SECONDS);
System.out.println("Shutting down");
};
}
@Bean
public SmartLifecycle bindingFixer(BindingService bindingService) {
return new SmartLifecycle() {
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public void stop() {
// no op
}
@Override
public void start() {
@SuppressWarnings("unchecked")
Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
.getPropertyValue("consumerBindings");
@SuppressWarnings("unchecked")
Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
.getPropertyValue("lifecycle.messageListenerContainer"))
.getContainerProperties().setShutdownTimeout(30_000L);
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
@Override
public boolean isAutoStartup() {
return true;
}
};
}
}