更新时间:2023-10-08 17:44:28
感谢 Oleg 发布此解决方案背后的想法.从本质上讲,我增强了他的建议,以概括地处理以下两者之间的桥梁:
Thanks to Oleg for posting the idea behind this solution. Essentially, I enhanced his proposal to generically handle a bridge between:
This solution encapsulates the concerns described in Oleg example, inside a custom implementation of a Supplier
. Such implementation exposes an API to trigger the Supplier
to emit a message passed as parameter. Such a class would look like the following:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Supplier;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class StreamSupplier implements Supplier<Flux<?>> {
private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
"spring.cloud.stream.sendto.destination";
public static <T> Message<?> createMessage(T payload, String destination) {
MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
if (destination != null && !destination.isEmpty())
builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
return builder.build();
}
private String defaultDestination;
private EmitterProcessor<? super Object> processor = EmitterProcessor.create();
public StreamSupplier() {
this(null);
}
public StreamSupplier(String defaultDestination) {
this.defaultDestination = defaultDestination;
}
// SEND APIs
public <T> Message<?> sendMessage(T payload) {
return sendMessage(payload, defaultDestination);
}
public <T> Message<?> sendMessage(T payload, String destination) {
return sendBody(createMessage(payload, destination));
}
public <T> T sendBody(T body) {
processor.onNext(body);
return body;
}
/**
* Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
* the output binding associated with this {@link Supplier}. Such programmatic publications
* are available through the {@code sendXXX} API methods available in this class.
*/
@Override
public Flux<?> get() {
return processor;
}
}
然后,开发人员只需:
Supplier
实现的实例注册为Spring
应用程序中的bean
;然后让spring-cloud-function
将此bean
扫描到FunctionCatalog
中.Supplier
将任何消息转发到流式基础结构-可以使用spring-cloud-stream
的所有提示进行配置. Supplier
implementation as a bean
in a Spring
application; and let spring-cloud-function
scan this bean
into the FunctionCatalog
.Supplier
- which can be configured using all the bells and whistles of spring-cloud-stream
. 以下示例对此进行了说明:
The following example demonstrate this:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
@SpringBootApplication
@Controller
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class,
"--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
}
// Functional Web Controller
@Bean
public Function<String, String> webToStreamFunction() {
return msg -> streamSupplier().sendBody(msg);
}
// Functional Stream Supplier
@Bean
public Supplier<Flux<?>> streamSupplierFunction() {
return new StreamSupplier();
}
// DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
// LIMITATION OF SPRING-CLOUD-FUNCTION
@Bean
public StreamSupplier streamSupplier() {
return (StreamSupplier) streamSupplierFunction();
}
}
再次,我要感谢 Oleg 提供的
Again, I want to thanks Oleg for providing the required details I was looking for to build this comprehensive solution.