且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

是否可以在 Camel 中捕获 netty 异常?

更新时间:2023-11-11 12:23:04

回答我自己的问题.我的问题是发布客户端,这些客户端将永远等待从 netty 获得某种响应,主要是在处理管道期间远程主机关闭连接的情况下.

Answering my own question. My problem was releasing clients that would just wait forever to get some kind of response from netty mostly in case of connections closed by remote hosts during processing the pipeline.

需要做的是向管道添加一个自定义处理程序,该处理程序应该扩展ChannelDuplexHandler 并覆盖connect 并编写methodsSimpleChannelInboundHandler 并覆盖 channelInactive.我使用了 ChannelDuplexHandler.

What needs to be done is to add a custom handler to the pipeline that should extend ChannelDuplexHandler and override connect and write methods or SimpleChannelInboundHandler and override channelInactive. I used ChannelDuplexHandler.

public class ExceptionHandler extends ChannelDuplexHandler {

private final NettyProducer producer;

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise)
        throws Exception {
    ctx.connect(remoteAddress, localAddress, promise)
            .addListener((future -> {
                if (!future.isSuccess()) {
                    // no need to do anything here, camel will manage it on its own
                }
            }));
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    ctx.write(msg, promise).addListener(future -> {
        if (!future.isSuccess()) {
            reportStatusBackToCamel(ctx);
        }
    });
}

private void reportStatusBackToCamel(ChannelHandlerContext ctx) {
    NettyCamelState nettyCamelState = producer.getCorrelationManager().getState(ctx, ctx.channel(),
            new IOException());
    Exchange exchange = nettyCamelState.getExchange();
    AsyncCallback callback = nettyCamelState.getCallback();
    exchange.setException(new RuntimeException("Client disconnected"));
    callback.done(false);
    }
}

如果是 SimpleChannelInboundHandler,只需将交换处理放入 channelInactive 方法.

In case of SimpleChannelInboundHandler just put exchange handling into channelInactive method.

initChannelClientInitializerFactory 中,将此处理程序添加到管道中:

In your ClientInitializerFactory in initChannel you add this handler to the pipeline:

 pipeline.addLast(new ExceptionHandler(producer));

producer 在应用程序启动时提供给您.如果你像我一样需要额外的 spring 注入 bean,你最终只会在你的工厂类中有几个构造函数,一个 @Autowired(带有你注入的字段)调用另一个设置额外的生产者字段.

producer is given to you on application startup. If you need additional spring injected beans as I did, you simply end up having a couple of constructors in your factory class, one @Autowired (with your injected fields) calling the other setting additional producer field.