Construction of Spring Cloud Stream

The program is set up to bind the channels INPUT and OUTPUT specified within the interface Processor by the annotation @EnableBinding. It is possible to set up both channels to utilize a certain message middleware or binder.

Java




@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }
  
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}


Functional Testing

A binder implementation serving as the test support enables channel interaction and message inspection. Send a message to the mentioned enrichLogMessage service and see if the answer includes the string “[1]:” at the start of the message.

Java




@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {
  
    @Autowired
    private Processor pipe;
  
    @Autowired
    private MessageCollector messageCollector;
  
    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is the message"))
          .build());
  
        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();
  
        assertEquals("[1]: This is the message", payload.toString());
    }
}


Reactive Functions support

You may use the reactive programming approach in the implementation of Supplier, Function, or Consumer with little effort because Spring Cloud Function is built on top of Project Reactor.

Java




@EnableAutoConfiguration
@EnableBinding(Processor.class)
public static class SinkFromConsumer {
    public static void main(String[] args) {
        SpringApplication.run(SinkFromConsumer.class
                              "--spring.cloud.stream.function.definition=reactiveUpperCase");
    }
    @Bean
    public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
        return flux -> flux.map(val -> val.toUpperCase());
    }
}


Spring Cloud Stream – Functional and Reactive

Spring Cloud Stream is a Spring framework that simplifies creating event-driven microservices. It uses functional programming constructs for message processing logic, often using annotated methods within a class and reactive programming tools like Reactor for asynchronous and reactive processing.

Maven Dependencies

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.1.3</version>
</dependency>

Similar Reads

Supplier, Function, and Consumer

SCSt treats any bean of type Supplier, Function, or Consumer as a message handler (Function or Consumer) or message source (Supplier), or any bean that may be mapped to Supplier, Function, or Consumer (e.g., a POJO function, Kotlin lambdas, and so forth). Input and output bindings are automatically produced using the -- naming standard, depending on the type of functional strategy that is being utilized....

Construction of Spring Cloud Stream

...

Conclusion

...

Contact Us