How to Access the First Element of a Flux

The Flux is one of the publishers in Spring WebFlux. Mostly this publisher is used for handling multiple events simultaneously. This Flux publisher belongs to reactive streams in WebFlux in Spring Reactive and this Flux publisher helps us to handle multiple events in an asynchronously non-blocking manner. One more thing is this Flux publisher is designed to handle 0 – N elements and emits data asynchronously as soon as It becomes Available.

In this article, we will learn how to access the first element of a Flux. Here, Flux means a list of elements.

Application of Flux:

  • The Flux publisher can be able to handle large amounts of data in a non-blocking manner.
  • And also, it can be used to process data from different resources like databases, HTTP requests, and others.
  • It can provide lot operators with the required data from large amounts of data by using a filter, map, reduce, and other operators.
  • Flux also supports backpressure, which allows a publisher to control the rate of data emitted by the subscriber.

Approaches to Access the First Element of a Flux

For accessing the first element of a Flux we have various approaches. Below, we have listed them. After that we will explain each approach with a good example. Below methods are available in Flux publisher.

  • using next() method
  • using blockFirst() method
  • using take() method
  • using collectList() method

Prerequisites:

  • Spring WebFlux
  • Functionality of Publisher, Subscriber
  • Reactive Streams in Spring

Dependencies:

To access the Spring Reactive Streams, we need add below Dependencies in our spring boot project.

dependencies {
// Spring WebFlux
implementation 'org.springframework.boot:spring-boot-starter-webflux'

// Reactor Core
implementation 'io.projectreactor:reactor-core'

// Spring Boot Starter Test (if needed)
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

Program to Access the First Element of a Flux

Flux is a publisher which can handle 0 to N elements. In our case, we need access the first element from list elements in the Flux. For accessing first element from Flux, we have one method next(). It available in Flux publisher.

  • To run this logic, we need create a Spring Stater project using Spring Tool Suite.
  • While creating project select Dependencies which mention in the above.
  • After completion project creation, In project main method class.
  • Write this logic then run as spring boot application.

1. Using next() method

In this example,

  • We have taken some Integer values from 100 to 500 in the form of Flux.
  • Then we have used next method with Flux object, then we call the subscribe().
  • After this we have created three lambda function for accessing first element from Flux.
  • The next lambda function used for printing error message while any error raises.
  • The final lambda expression is used for informing to the end user like the flux operation is completed.

Java




package com.reactive.app;
  
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
  
import reactor.core.publisher.Flux;
  
@SpringBootApplication
public class SpringreactorApplication {
  
    public static void main(String[] args) {
        SpringApplication.run(SpringreactorApplication.class, args);
          
        // Create a Flux with some elements
        Flux<Integer> flux = Flux.just(100, 200, 300, 400, 500);
          
        // Access the first element of the Flux
        flux.next().subscribe(
                value -> System.out.println("First element of Flux: " + value),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Flux completed")
            );
    }
  
}


Output:

2. Using blockFirst() method

This is another approach for access the first element of a Flux. Here, we have use blockFirst() method, it can access the first element from the flux of item.

Java




package com.reactive.app;
  
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
  
import reactor.core.publisher.Flux;
  
@SpringBootApplication
public class SpringreactorApplication {
  
    public static void main(String[] args) {
        SpringApplication.run(SpringreactorApplication.class, args);
        // Create a Flux with some elements
        Flux<Integer> flux = Flux.just(100, 200, 300, 400, 500);
          
        Integer firstElement = flux.blockFirst();
        System.out.println("First element of Flux: " + firstElement);
    }
}


Output:

output

3. Using take() method

In this approach, we have used take() method. This method can take positions of Flux items. It’s range start from 1 to N.

Java




package com.reactive.app;
  
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
  
import reactor.core.publisher.Flux;
  
@SpringBootApplication
public class SpringreactorApplication {
  
    public static void main(String[] args) {
        SpringApplication.run(SpringreactorApplication.class, args);
        // Create a Flux with some elements
        Flux<Integer> flux = Flux.just(100, 200, 300, 400, 500);
          
        flux.take(1).subscribe(
                value -> System.out.println("First element of Flux: " + value),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Flux completed")
            );
  
    }
}


Output:

4. Using collectList() method

In this approach, we have used collectList() method. Here, we have used collectList() method after collect all the methods. Then it prints the first element from list of elements by using get method in list.

Java




package com.reactive.app;
  
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
  
import reactor.core.publisher.Flux;
  
@SpringBootApplication
public class SpringreactorApplication {
  
    public static void main(String[] args) {
        SpringApplication.run(SpringreactorApplication.class, args);
        // Create a Flux with some elements
        Flux<Integer> flux = Flux.just(100, 200, 300, 400, 500);
          
        flux.collectList().subscribe(
                list -> {
                    if (!list.isEmpty()) {
                        Integer firstElement = list.get(0);
                        System.out.println("First element of Flux: " + firstElement);
                    } else {
                        System.out.println("Flux is empty");
                    }
                },
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Flux completed")
            );
  
  
    }
}


Output:



Contact Us