且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何为 spring-cloud-stream-app-starters 项目创建自定义源应用程序

更新时间:2023-01-09 23:34:31

你可以在 参考手册.

WebSocketInboundChannelAdapter 是一个事件驱动的通道适配器,它不是可轮询的源.所以,你需要的只是一个 @Bean 和一个对 Source.OUTPUT 的适当引用.

The WebSocketInboundChannelAdapter is an event-driven channel adapter, it's not pollable source. So, what you need is just a @Bean for this one and an appropriate reference to the Source.OUTPUT.

WebSocketInboundChannelAdapter 不启动服务器.这是以下人员的责任:

The WebSocketInboundChannelAdapter doesn't start server. That is responsibility of the:

/**
 * The {@link IntegrationWebSocketContainer} implementation for the {@code server}
 * {@link org.springframework.web.socket.WebSocketHandler} registration.
 * <p>
 * Registers an internal {@code IntegrationWebSocketContainer.IntegrationWebSocketHandler}
 * for provided {@link #paths} with the {@link WebSocketHandlerRegistry}.
 * <p>
 * The real registration is based on Spring Web-Socket infrastructure via {@link WebSocketConfigurer}
 * implementation of this class.
 *
 * @author Artem Bilan
 * @author Gary Russell
 * @since 4.1
 */
public class ServerWebSocketContainer extends IntegrationWebSocketContainer
        implements WebSocketConfigurer, SmartLifecycle {

我们有一个 相关的文档.

We have a documentation on the matter as well.

还有一个stomp-chat 演示服务器行为的示例.

There is also a stomp-chat sample to demonstrate the server behavior.

我认为在这种 source 应用程序中您不需要底层代理":您只需通过网络套接字接收消息并将它们发布到 Source.OUTPUT.为什么这里需要 STOMP 代理?

I think you don't need "underlying broker" in this kind of source application: you just receive messages over web socket and publish them to the Source.OUTPUT. Why do you need STOMP broker here?

更新

刚刚针对 Rabbit Binder 测试了此代码:

Have just tested this code against Rabbit Binder:

@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamWebSocketSourceApplication {

    @Bean
    public WebSocketInboundChannelAdapter webSocketInboundChannelAdapter() {
        WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
                new WebSocketInboundChannelAdapter(serverWebSocketContainer());
        webSocketInboundChannelAdapter.setOutputChannelName(Source.OUTPUT);
        return webSocketInboundChannelAdapter;
    }

    @Bean
    public IntegrationWebSocketContainer serverWebSocketContainer() {
        return new ServerWebSocketContainer("/test")
                .withSockJs()
                .setAllowedOrigins("*");
    }

    public static void main(String[] args) throws IOException {
        SpringApplication.run(CloudStreamWebSocketSourceApplication.class, args);
        System.out.println("Done");
    }

}

我的测试用例如下:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class CloudStreamWebSocketSourceApplicationTests {

    @LocalServerPort
    private String port;

    @Test
    public void testWebSocketStreamSource() throws IOException, InterruptedException {
        StandardWebSocketClient webSocketClient = new StandardWebSocketClient();

        ClientWebSocketContainer clientWebSocketContainer =
                new ClientWebSocketContainer(webSocketClient, "ws://localhost:" + this.port + "/test/websocket");
        clientWebSocketContainer.start();

        WebSocketSession session = clientWebSocketContainer.getSession(null);

        session.sendMessage(new TextMessage("foo"));

        Thread.sleep(10000);
    }

}

这是我的依赖项:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>