Handling Streaming Data with Webflux

Handling streaming data with WebFlux in a Spring Boot application involves leveraging Spring WebFlux’s reactive programming capabilities to manage asynchronous data streams efficiently. WebFlux supports both server-side and client-side reactive programming using the Project Reactor library. Below is an example that demonstrates how to handle streaming data using WebFlux.

Key Concepts:

  • Reactive Programming: Reactive programming is about building non-blocking, asynchronous applications that can handle large amounts of data with minimal overhead. Spring WebFlux is built on Project Reactor, which provides the Flux and Mono types for handling reactive streams.
  • Flux and Mono:
    • Flux – Represents a reactive sequence of 0 to N items.
    • Mono -Represents a reactive sequence of 0 or 1 items
  • Backpressure: The ability to handle varying data production and consumption rates. Project Reactor provides operators and strategies to manage Backpressure effectively.
  • Server-Sent Events: A standard for streaming data from server to client over HTTP. WebFlux supports SSE, which is ideal for streaming data scenarios.


Prerequisites:

  • Spring Framework
  • Spring WebFlux
  • Publisher and Consumer in Spring WebFlux
  • RESTful APIs and It’s Workflow
  • HTTP Status codes
  • Spring Stater project creation
  • Streams

Tools & Technologies:

  • Spring Boot version: 3.2.5
  • Spring Reactive programming
  • Spring Tool Suite

Steps to Handle Streaming Data with WebFlux

Here, we created one simple spring reactive project after that we created one RestController class for defining the API endpoints and we created one component class for Streaming Data in the project with required Dependencies Below we provide a detailed explanation with examples for better understanding the concept.

Dependencies:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>


Project Structure:

After project creation is done, the structure will be like the below:


Step 1: Create Component Class

Once the Spring Stater Project is created, then we create a component class in the project main package. This class have one method named getNumbers(). This method continuously returns events in the form of flux publisher. Here we use Flux.fromStream() method which is used for to build a Flux from an existing Java 8 stream or a Java 8 stream supplier

StreamSource.java:

Java
package com.app;

import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.stream.Stream;

@Component
public class StreamSource {
    public Flux<Integer> getNumbers() {
        return Flux.fromStream(Stream.iterate(1, n -> n + 1))
                   .delayElements(Duration.ofSeconds(1));
    }
}


Step 2: Create Controller Class

Now, we create a RestController class to define the API endpoints using the Get Mapping request type. And in that class, We define the method called streamNumbers(). And this method can return getNumbers() method by using StreamSource object. Here we use Flux publisher because stream is used on list of elements, The list elements are handled by the Flux publisher in Reactive programming.

Java
package com.app;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class StreamController {

    @Autowired
    private StreamSource streamSource;

    @GetMapping(value = "/stream/numbers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Integer> streamNumbers() {
        return streamSource.getNumbers();
    }
}


Step 3: Run the Project

Now, run this project as Spring Boot app or you can this project by using Maven commands. However, we run this project as a Spring Boot App. This Spring application operates on the Netty server with the default port number set to 8080.


Step 4: Test the API

Finally, we need to test the API endpoint functionality. Here, we utilize the Postman tool for API testing, and below are the output images provided for reference.

http://localhost:8080/stream/numbers

If we hit the above API, then we will get stream data as output.

Output:




Contact Us