Understanding Reactive Pipelines in Spring Webflux.

Understanding Reactive Pipelines in Spring Webflux.

Introduction

With the growing demand for responsive and resilient web applications, Spring 5 introduced Spring WebFlux in 2017, a powerful framework that internally uses Project Reactor's publisher implementations Flux and Mono to provide a non-blocking, asynchronous programming model with reactive pipelines at its core.

Reactive Pipelines

Reactive pipelines allow developers to chain asynchronous operations, enabling applications to handle high-throughput scenarios with lower resource consumption and improved performance. Reactive Pipelines leverage a declarative programming style where data flows through a sequence of operators, each transforming or reacting to the data in a non-blocking fashion. This approach efficiently manages complex asynchronous workflows and streams of data. This approach is ideal for building scalable and resilient web applications.

Lifecycle and Key Methods

1) Starting the Reactive Pipeline :

A reactive pipeline begins with a data producer or source. This is where the reactive sequence is initiated. Common ways to start a pipeline include:

  • Mono.just(T... items): Creates a Mono that emits the specified items. Use this when you have a single item to emit.
  • Flux.just(T... items): Creates a Flux that emits the specified items. Use this when you have multiple items to emit.
  • Mono.error(Throwable throwable): Creates a Mono that emits an error. Useful for error signaling.
  • Flux.fromStream(Stream<? extends T> stream): Creates a Flux that emits items from a Stream. Ideal for converting a Java Stream into a reactive stream.


2) Processing the Pipeline

Once the pipeline is started, you can process and transform the data using various operators:

  • collectList( ): Collects items into a List and returns a Mono<List<T>>. Useful for aggregating results, such as combining multiple records into a single list.
  • flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper): Flattens nested Publisher instances. This helps in handling asynchronous data that requires further asynchronous processing, like chaining multiple network requests.
  • doOnNext(Consumer<? super T> onNext): Performs a side-effect for each item, such as logging or monitoring. For example, you might log each item that passes through the pipeline.
  • doOnError(Consumer<? super Throwable> onError): Handles errors that occur during processing. It’s used to log errors or perform specific actions when something goes wrong.
  • map(Function<? super T, ? extends R> mapper): Transforms each item emitted by the source, altering the data as needed. For instance, you could use map to convert data into a different format.


3) Ending The pipeline

The pipeline concludes with either successful completion or an error. This is where you define the final steps of processing:

  • then( ): Returns a Mono<Void> that completes when the source Mono completes. Useful for chaining actions after the pipeline finishes, such as cleanup tasks.
  • doFinally(Consumer<SignalType> onFinally): Executes a side-effect when the pipeline terminates, regardless of its outcome. Ideal for performing final cleanup actions or releasing resources.
  • onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback): Provides a fallback sequence in case of an error. This allows the pipeline to recover from failures by providing an alternative data source.


4) Subscribing to the pipeline

The lifecycle of a reactive pipeline in Spring WebFlux can be categorized into two main types, each with its distinct approach to starting and managing the pipeline.

  • Manual Subscription: Explicitly subscribes to the pipeline to trigger execution.
  • Framework Managed: The framework manages the subscription automatically in response to HTTP requests.

It's crucial to note that if a reactive pipeline is not managed by the framework, it must be explicitly subscribed to in order to execute and produce results. Without subscription, the pipeline remains idle and does not trigger any data processing or emissions.


A simple example,

Reactive Pipeline lifecycle of data retrieval from an R2DBC (Reactive Relational Database Connectivity) database. Assume ` Book ` is an entity.

public interface BookRepository extends ReactiveCrudRepository<Book, Long> {
    Flux<Book> findAll();
}        

Manual Subscription

Here the Reactive Pipeline is started and managed manually.

@Service
@RequiredArgsConstructor
public class BookService {

    private final BookRepository bookRepository;

    public void fetchBooksManually( ) {
        Flux<Book> bookFlux = bookRepository.findAll( )
            .map(book -> {
                System.out.println("Processing book: " + book.getTitle( ));
                return book;
            });

        bookFlux.subscribe(
            book -> System.out.println("Received book: " + book.getTitle( )),
            error -> System.err.println("Error occurred: " + error.getMessage( )),
            ( ) -> System.out.println("Completed processing books")
        );
    }
}        

fetchBooksManually method creates a reactive pipeline that retrieves a Flux<Book> from the bookRepository , processes each book to print its title, and then manually subscribes to the Flux to print each book's title upon receipt. The subscription also handles errors and completion, logging appropriate messages for each.


Framework Managed

Here the Reactive Pipeline is managed automatically by Spring Webflux.

@RestController
@RequiredArgsConstructor
@RequestMapping("/books")
public class BookController {

    private final BookRepository bookRepository;

    @GetMapping
    public Flux<Book> getBooks() {
        return bookRepository.findAll()
            .map(book -> {
                System.out.println("Processing book: " + book.getTitle());
                return book;
            });
    }
}        

BookController defines a REST endpoint to handle GET requests at ` /books `. The getBooks method uses Spring WebFlux to return a Flux<Book>, which represents a stream of books retrieved from the BookRepository. The WebFlux framework automatically manages the subscription, processing each book as it is emitted from the repository.


A Working Example

Objective:

The goal is to retrieve a list of source names from a repository (R2DBC), process them, and send them to an external API via WebClient. The example illustrates the use of reactive operators to manage the flow of data, handle errors, and perform actions based on the results.

The Repository:

The HistoryRepository interface extends ReactiveCrudRepository, which enables reactive operations on the UserHistory entity. The findAllSourceNames( ) method in this repository uses a custom SQL query to retrieve distinct source names from the userhistory table. This method returns a Flux<String>, which represents a stream of source names that are emitted asynchronously.

public interface HistoryRepository extends ReactiveCrudRepository<UserHistory, Integer> {

    @Query("SELECT DISTINCT source_name FROM userhistory")
    Flux<String> findAllSourceNames();
}        

The Controller:

The HistoryController class exposes an endpoint at ` /api/pref ` that triggers the sendAllSourceNames( ) method from the AnalyticService. When a GET request is made to this endpoint, the sendAllSourceNames( ) method is called, initiating the reactive pipeline. The controller logs a message when the subscription starts and another message when the pipeline terminates, regardless of its outcome.

  @GetMapping("/pref")
    public Mono<Void> send() {   
        return analyticService.sendAllSourceNames()
                .doOnSubscribe(subscription -> System.out.println("Subscribed"))
                .doOnTerminate(() -> System.out.println("Terminated"));
    }        

The AnalyticService:

The AnalyticService class, the sendAllSourceNames( ) method retrieves a Flux<String> of source names from the repository and aggregates them into a List<String> using collectList(). If the list is empty, it logs a message and returns an empty Mono.

If the list contains names, it sends them to an external API with a POST request using WebClient. The method logs success or error messages based on the result of the HTTP request. The doOnTerminate( ) operator logs a final message when the processing completes, and then( ) returns a Mono<Void> to indicate the end of the reactive chain.

 public Mono<Void> sendAllSourceNames() {  

        return historyRepository.findAllSourceNames() 
                .collectList()      
                .doOnNext(sourceNames -> System.out.println("Names" + sourceNames))
                .flatMap(sourceNames -> {
                    if (sourceNames.isEmpty()) {
                        System.out.println("No names found");
                        return Mono.empty();
                    }     
                    return webClient.post()
                            .uri("/api/sources")
                            .bodyValue(sourceNames)  
                            .retrieve()
                            .bodyToMono(Void.class)
                            .doOnSuccess(result -> System.out.println("Successful"))
                            .doOnError(error -> System.err.println("Error" + error.getMessage()));
                })
                .doOnError(error -> System.err.println("Errors" + error.getMessage()))
                .doOnTerminate(() -> System.out.println("Finished"))
                .then(); 
    }        


Conclusion

This article provides an overview of reactive pipelines in Spring WebFlux, highlighting how Project Reactor’s Flux and Mono enable non-blocking, asynchronous processing, essential steps of creating and managing a reactive pipeline, from initiating data sources to handling errors and completion.

Through practical examples from one of my projects, this article demonstrates how reactive programming can be used to efficiently fetch and process data from a repository and interact with external APIs. The goal is to illustrate how adopting a reactive approach can enhance the efficiency and responsiveness of web applications.


References:

Web on Reactive Stack - Spring Documentation

Reactor.core - Project Reactor Documentation

The Reactive Manifesto

Baeldung Articles on Spring Webflux

My Latest Spring Webflux Project

Savindu Rashmika

Full Stack Web Developer

6 个月

????

Reactive programming ?? WebFlux

Denuwan Nammuni

Information Systems (UG) ?? Passionate MERN Stack Developer | React.js Enthusiast | Building Scalable Web Applications | Seeking Internship Opportunities to Drive Innovation

7 个月

Very helpful!

要查看或添加评论,请登录

Pawan Hettiarachchi的更多文章