且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Spring集成DSL Scatter-Gather异步/并行执行多个接收者流

更新时间:2022-12-03 22:15:18

使用提到的executor channel确实可以做到这一点.您所有的接收者流程实际上都必须从ExecutorChannel开始.在您的情况下,您必须将它们全部修改为如下形式:

This is indeed possible with the mentioned executor channel. All you recipient flows must really start from the ExecutorChannel. In your case you have to modify all of them to something like this:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

请注意IntegrationFlows.from(MessageChannels.executor(taskExexecutor())).这就是使每个子流异步的方式.

Pay attention to the IntegrationFlows.from(MessageChannels.executor(taskExexecutor())). That's exactly how you can make each sub-flow async.

更新

对于未对子流进行IntegrationFlow改进的较早版本的Spring Integration,我们可以这样做:

For the older Spring Integration version without IntegrationFlow improvement for the sub-flows we can do like this:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

这类似于您在上面的注释中显示的内容.

This is similar to what you show in the comment above.