Understanding Reactive Pipelines in Spring Webflux.
Introduction
With the growing demand for responsive and resilient web applications, Spring 5 introduced Spring WebFlux in 2017, a powerful framework that internally uses Project Reactor's publisher implementations Flux and Mono to provide a non-blocking, asynchronous programming model with reactive pipelines at its core.
Reactive Pipelines
Reactive pipelines allow developers to chain asynchronous operations, enabling applications to handle high-throughput scenarios with lower resource consumption and improved performance. Reactive Pipelines leverage a declarative programming style where data flows through a sequence of operators, each transforming or reacting to the data in a non-blocking fashion. This approach efficiently manages complex asynchronous workflows and streams of data. This approach is ideal for building scalable and resilient web applications.
Lifecycle and Key Methods
1) Starting the Reactive Pipeline :
A reactive pipeline begins with a data producer or source. This is where the reactive sequence is initiated. Common ways to start a pipeline include:
2) Processing the Pipeline
Once the pipeline is started, you can process and transform the data using various operators:
3) Ending The pipeline
The pipeline concludes with either successful completion or an error. This is where you define the final steps of processing:
4) Subscribing to the pipeline
The lifecycle of a reactive pipeline in Spring WebFlux can be categorized into two main types, each with its distinct approach to starting and managing the pipeline.
It's crucial to note that if a reactive pipeline is not managed by the framework, it must be explicitly subscribed to in order to execute and produce results. Without subscription, the pipeline remains idle and does not trigger any data processing or emissions.
A simple example,
Reactive Pipeline lifecycle of data retrieval from an R2DBC (Reactive Relational Database Connectivity) database. Assume ` Book ` is an entity.
public interface BookRepository extends ReactiveCrudRepository<Book, Long> {
Flux<Book> findAll();
}
Manual Subscription
Here the Reactive Pipeline is started and managed manually.
@Service
@RequiredArgsConstructor
public class BookService {
private final BookRepository bookRepository;
public void fetchBooksManually( ) {
Flux<Book> bookFlux = bookRepository.findAll( )
.map(book -> {
System.out.println("Processing book: " + book.getTitle( ));
return book;
});
bookFlux.subscribe(
book -> System.out.println("Received book: " + book.getTitle( )),
error -> System.err.println("Error occurred: " + error.getMessage( )),
( ) -> System.out.println("Completed processing books")
);
}
}
fetchBooksManually method creates a reactive pipeline that retrieves a Flux<Book> from the bookRepository , processes each book to print its title, and then manually subscribes to the Flux to print each book's title upon receipt. The subscription also handles errors and completion, logging appropriate messages for each.
Framework Managed
Here the Reactive Pipeline is managed automatically by Spring Webflux.
@RestController
@RequiredArgsConstructor
@RequestMapping("/books")
public class BookController {
private final BookRepository bookRepository;
@GetMapping
public Flux<Book> getBooks() {
return bookRepository.findAll()
.map(book -> {
System.out.println("Processing book: " + book.getTitle());
return book;
});
}
}
BookController defines a REST endpoint to handle GET requests at ` /books `. The getBooks method uses Spring WebFlux to return a Flux<Book>, which represents a stream of books retrieved from the BookRepository. The WebFlux framework automatically manages the subscription, processing each book as it is emitted from the repository.
A Working Example
Objective:
The goal is to retrieve a list of source names from a repository (R2DBC), process them, and send them to an external API via WebClient. The example illustrates the use of reactive operators to manage the flow of data, handle errors, and perform actions based on the results.
The Repository:
The HistoryRepository interface extends ReactiveCrudRepository, which enables reactive operations on the UserHistory entity. The findAllSourceNames( ) method in this repository uses a custom SQL query to retrieve distinct source names from the userhistory table. This method returns a Flux<String>, which represents a stream of source names that are emitted asynchronously.
public interface HistoryRepository extends ReactiveCrudRepository<UserHistory, Integer> {
@Query("SELECT DISTINCT source_name FROM userhistory")
Flux<String> findAllSourceNames();
}
The Controller:
The HistoryController class exposes an endpoint at ` /api/pref ` that triggers the sendAllSourceNames( ) method from the AnalyticService. When a GET request is made to this endpoint, the sendAllSourceNames( ) method is called, initiating the reactive pipeline. The controller logs a message when the subscription starts and another message when the pipeline terminates, regardless of its outcome.
@GetMapping("/pref")
public Mono<Void> send() {
return analyticService.sendAllSourceNames()
.doOnSubscribe(subscription -> System.out.println("Subscribed"))
.doOnTerminate(() -> System.out.println("Terminated"));
}
The AnalyticService:
The AnalyticService class, the sendAllSourceNames( ) method retrieves a Flux<String> of source names from the repository and aggregates them into a List<String> using collectList(). If the list is empty, it logs a message and returns an empty Mono.
If the list contains names, it sends them to an external API with a POST request using WebClient. The method logs success or error messages based on the result of the HTTP request. The doOnTerminate( ) operator logs a final message when the processing completes, and then( ) returns a Mono<Void> to indicate the end of the reactive chain.
public Mono<Void> sendAllSourceNames() {
return historyRepository.findAllSourceNames()
.collectList()
.doOnNext(sourceNames -> System.out.println("Names" + sourceNames))
.flatMap(sourceNames -> {
if (sourceNames.isEmpty()) {
System.out.println("No names found");
return Mono.empty();
}
return webClient.post()
.uri("/api/sources")
.bodyValue(sourceNames)
.retrieve()
.bodyToMono(Void.class)
.doOnSuccess(result -> System.out.println("Successful"))
.doOnError(error -> System.err.println("Error" + error.getMessage()));
})
.doOnError(error -> System.err.println("Errors" + error.getMessage()))
.doOnTerminate(() -> System.out.println("Finished"))
.then();
}
Conclusion
This article provides an overview of reactive pipelines in Spring WebFlux, highlighting how Project Reactor’s Flux and Mono enable non-blocking, asynchronous processing, essential steps of creating and managing a reactive pipeline, from initiating data sources to handling errors and completion.
Through practical examples from one of my projects, this article demonstrates how reactive programming can be used to efficiently fetch and process data from a repository and interact with external APIs. The goal is to illustrate how adopting a reactive approach can enhance the efficiency and responsiveness of web applications.
References:
Full Stack Web Developer
6 个月????
Reactive programming ?? WebFlux
Information Systems (UG) ?? Passionate MERN Stack Developer | React.js Enthusiast | Building Scalable Web Applications | Seeking Internship Opportunities to Drive Innovation
7 个月Very helpful!