且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Spring Integration中的REST端点使消息传递通道成为多线程

更新时间:2022-06-18 23:46:18

你这里没有队列,因为每个HTTP请求都在自己的线程中执行。是的,当http线程池耗尽时,你可能无论如何都在那里排队,但只有两个请求就不会出现在你的简单用例中。

You don't have a queue here because each HTTP request is performed in its own thread. Right, you maybe queued there anyway when the pool of http threads is exhausted, but that doesn't look in your simple use-case with just two requests.

你可以无论如何都要在那里实现队列行为,但你应该将 toSftpChannel 声明为 QueueChannel bean。

You can achieve a queue behavior there anyway, but you should declare your toSftpChannel as a QueueChannel bean.

这样下游进程将始终在同一个线程上执行,下一条消息将在第一个消息之后完全从队列中提取。

That way downstream process will always be performed on the same thread and the next message is pulled from the queue exactly after the first one.

参见参考手册了解更多信息。

更新

由于您使用 FtpMessageHandler 这是单向组件,但您仍需要回复MVC控制器的方法,只有这样做才能获得 @Gateway 方法非 - void 返回a当然我们需要以某种方式发送回复。

Since you use FtpMessageHandler which is one-way component, but you still need some reply to the MVC controller's methods, only the way to do that is to have a @Gateway method with non-void return and of course we need to send the reply somehow.

为此我建议使用 PublishSubscribeChannel

@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
    return new PublishSubscribeChannel();
}

@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
    SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
    return handler;
}

这样我们就有两个 toSftpChannel 。使用 @Order(0),我们确保 @ServiceActivator 是第一个订户,因为我们需要先执行SFTP传输。使用 @BridgeTo ,我们将相同的 PublishSubscribeChannel $ c C>。其目的只是获取 replyChannel 标头并在那里发送请求消息。由于我们不使用任何线程, BridgeHandler 将在完成转移到SFTP后完成。

This way we have two subscribers to the toSftpChannel. With the @Order(0) we ensure that @ServiceActivator is the first subscriber since we need to perform SFTP transfer first. With the @BridgeTo we add a second BridgeHandler to the same PublishSubscribeChannel. Its purpose is just to get a replyChannel header and send the request message there. Since we don't use any threading the BridgeHandler will be performed exactly after finish of the transfer to the SFTP.

当然代替 BridgeHandler ,您可以拥有任何其他 @ServiceActivator @Transfromer 作为回复返回而不是请求文件,但其他任何内容。例如:

Of course instead of BridgeHandler you can have any other @ServiceActivator or @Transfromer to return as a reply not a request File, but anything else. For example:

@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(File payload) {
    return "The SFTP transfer complete for file: " + payload;
}