更新时间:2023-11-11 12:23:04
回答我自己的问题.我的问题是发布客户端,这些客户端将永远等待从 netty 获得某种响应,主要是在处理管道期间远程主机关闭连接的情况下.
Answering my own question. My problem was releasing clients that would just wait forever to get some kind of response from netty mostly in case of connections closed by remote hosts during processing the pipeline.
需要做的是向管道添加一个自定义处理程序,该处理程序应该扩展ChannelDuplexHandler
并覆盖connect
并编写methods
或SimpleChannelInboundHandler
并覆盖 channelInactive
.我使用了 ChannelDuplexHandler
.
What needs to be done is to add a custom handler to the pipeline that should extend ChannelDuplexHandler
and override connect
and write methods
or SimpleChannelInboundHandler
and override channelInactive
. I used ChannelDuplexHandler
.
public class ExceptionHandler extends ChannelDuplexHandler {
private final NettyProducer producer;
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)
throws Exception {
ctx.connect(remoteAddress, localAddress, promise)
.addListener((future -> {
if (!future.isSuccess()) {
// no need to do anything here, camel will manage it on its own
}
}));
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(msg, promise).addListener(future -> {
if (!future.isSuccess()) {
reportStatusBackToCamel(ctx);
}
});
}
private void reportStatusBackToCamel(ChannelHandlerContext ctx) {
NettyCamelState nettyCamelState = producer.getCorrelationManager().getState(ctx, ctx.channel(),
new IOException());
Exchange exchange = nettyCamelState.getExchange();
AsyncCallback callback = nettyCamelState.getCallback();
exchange.setException(new RuntimeException("Client disconnected"));
callback.done(false);
}
}
如果是 SimpleChannelInboundHandler
,只需将交换处理放入 channelInactive
方法.
In case of SimpleChannelInboundHandler
just put exchange handling into channelInactive
method.
在 initChannel
的 ClientInitializerFactory
中,将此处理程序添加到管道中:
In your ClientInitializerFactory
in initChannel
you add this handler to the pipeline:
pipeline.addLast(new ExceptionHandler(producer));
producer
在应用程序启动时提供给您.如果你像我一样需要额外的 spring 注入 bean,你最终只会在你的工厂类中有几个构造函数,一个 @Autowired
(带有你注入的字段)调用另一个设置额外的生产者字段.
producer
is given to you on application startup. If you need additional spring injected beans as I did, you simply end up having a couple of constructors in your factory class, one @Autowired
(with your injected fields) calling the other setting additional producer field.