Spring Cloud Stream – Functional and Reactive
Spring Cloud Stream is a Spring framework that simplifies creating event-driven microservices. It uses functional programming constructs for message processing logic, often using annotated methods within a class and reactive programming tools like Reactor for asynchronous and reactive processing.
Maven Dependencies
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.1.3</version>
</dependency>
Supplier, Function, and Consumer
SCSt treats any bean of type Supplier, Function, or Consumer as a message handler (Function or Consumer) or message source (Supplier), or any bean that may be mapped to Supplier, Function, or Consumer (e.g., a POJO function, Kotlin lambdas, and so forth). Input and output bindings are automatically produced using the <function-name>-<in/out>-<index> naming standard, depending on the type of functional strategy that is being utilized.
Functional:
Java
@SpringBootApplication public class GFG { @Bean public Function<String, String> uppercase() { return value -> value.toUpperCase(); } } |
Reactive:
Java
@SpringBootApplication public class GFG { @Bean public Function<Flux<String>, Flux<String>> uppercase() { return flux -> flux.map(value -> value.toUpperCase()); } } |
Construction of Spring Cloud Stream
The program is set up to bind the channels INPUT and OUTPUT specified within the interface Processor by the annotation @EnableBinding. It is possible to set up both channels to utilize a certain message middleware or binder.
Java
@SpringBootApplication @EnableBinding (Processor. class ) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication. class , args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format( "[1]: %s" , log.getMessage())); } } |
Functional Testing
A binder implementation serving as the test support enables channel interaction and message inspection. Send a message to the mentioned enrichLogMessage service and see if the answer includes the string “[1]:” at the start of the message.
Java
@RunWith (SpringJUnit4ClassRunner. class ) @ContextConfiguration (classes = MyLoggerServiceApplication. class ) @DirtiesContext public class MyLoggerApplicationTests { @Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText() { pipe.input() .send(MessageBuilder.withPayload( new LogMessage( "This is the message" )) .build()); Object payload = messageCollector.forChannel(pipe.output()) .poll() .getPayload(); assertEquals( "[1]: This is the message" , payload.toString()); } } |
Reactive Functions support
You may use the reactive programming approach in the implementation of Supplier, Function, or Consumer with little effort because Spring Cloud Function is built on top of Project Reactor.
Java
@EnableAutoConfiguration @EnableBinding (Processor. class ) public static class SinkFromConsumer { public static void main(String[] args) { SpringApplication.run(SinkFromConsumer. class , "--spring.cloud.stream.function.definition=reactiveUpperCase" ); } @Bean public Function<Flux<String>, Flux<String>> reactiveUpperCase() { return flux -> flux.map(val -> val.toUpperCase()); } } |
Conclusion
So, this is how Spring Cloud Stream – functional and reactive. Spring framework that simplifies creating event-driven microservices. It uses functional programming constructs for message processing logic, often using annotated methods within a class and reactive programming tools like Reactor for asynchronous and reactive processing.
Contact Us