且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在Spring Cloud Stream中配置函数的绑定以将其输入绑定到Web终结点并将其输出绑定到Kafka主题

更新时间:2023-10-08 17:44:28

感谢 Oleg 发布此解决方案背后的想法.从本质上讲,我增强了他的建议,以概括地处理以下两者之间的桥梁:

Thanks to Oleg for posting the idea behind this solution. Essentially, I enhanced his proposal to generically handle a bridge between:

  1. 功能完善的网络控制器;可以接收网络请求.
  2. 流供应商;可以将任何消息转发到消息传递基础结构.

此解决方案封装了

This solution encapsulates the concerns described in Oleg example, inside a custom implementation of a Supplier. Such implementation exposes an API to trigger the Supplier to emit a message passed as parameter. Such a class would look like the following:

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

public class StreamSupplier implements Supplier<Flux<?>> {

    private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
            "spring.cloud.stream.sendto.destination";

    public static <T> Message<?> createMessage(T payload, String destination) {
        MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
        if (destination != null && !destination.isEmpty())
            builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
        return builder.build();
    }

    private String defaultDestination;
    private EmitterProcessor<? super Object> processor = EmitterProcessor.create();

    public StreamSupplier() {
        this(null);
    }

    public StreamSupplier(String defaultDestination) {
        this.defaultDestination = defaultDestination;
    }

    // SEND APIs

    public <T> Message<?> sendMessage(T payload) {
        return sendMessage(payload, defaultDestination);
    }

    public <T> Message<?> sendMessage(T payload, String destination) {
        return sendBody(createMessage(payload, destination));
    }

    public <T> T sendBody(T body) {
        processor.onNext(body);
        return body;
    }

    /**
     * Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
     * the output binding associated with this {@link Supplier}. Such programmatic publications
     * are available through the {@code sendXXX} API methods available in this class.
     */
    @Override
    public Flux<?> get() {
        return processor;
    }
}

然后,开发人员只需:

  1. 将此特定Supplier实现的实例注册为Spring应用程序中的bean;然后让spring-cloud-function将此bean扫描到FunctionCatalog中.
  2. 创建网络功能使用先前注册的Supplier将任何消息转发到流式基础结构-可以使用spring-cloud-stream的所有提示进行配置.
  1. Register an instance of this particular Supplier implementation as a bean in a Spring application; and let spring-cloud-function scan this bean into the FunctionCatalog.
  2. Create a web function that forwards any message to a streaming infrastructure using the previously registered Supplier - which can be configured using all the bells and whistles of spring-cloud-stream.

以下示例对此进行了说明:

The following example demonstrate this:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;

import java.util.function.Function;
import java.util.function.Supplier;

import reactor.core.publisher.Flux;

@SpringBootApplication
@Controller
public class MyApp {

    public static void main(String[] args) {
        SpringApplication.run(MyApp.class,
                "--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
    }

    // Functional Web Controller
    @Bean
    public Function<String, String> webToStreamFunction() {
        return msg -> streamSupplier().sendBody(msg);
    }

    // Functional Stream Supplier
    @Bean
    public Supplier<Flux<?>> streamSupplierFunction() {
        return new StreamSupplier();
    }

    // DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
    // LIMITATION OF SPRING-CLOUD-FUNCTION
    @Bean
    public StreamSupplier streamSupplier() {
        return (StreamSupplier) streamSupplierFunction();
    }
}

再次,我要感谢 Oleg 提供的

Again, I want to thanks Oleg for providing the required details I was looking for to build this comprehensive solution.

在GitHub上完成代码