Series 2: Intermediate Concepts in Reactive Programming
Puneet Tripathi
Chief Technology Officer | AI & Digital Transformation Leader | Driving Innovation at the Intersection of Technology & Business | IIM Rohtak MBA | Master's in Physics
See these articles before you start
Introduction
Welcome back! , In the first article (Series Part 1) , I introduced the fundamentals of reactive programming. Now, we'll delve into intermediate concepts that will empower you to build more complex and efficient reactive applications.
Chaining Reactive Streams
Reactive programming allows you to chain multiple asynchronous operations seamlessly.
Example: Fetching User and Orders
Let's fetch a user's details and their orders, then combine the results.
public Mono<UserOrders> getUserOrders(String userId) {
return userRepository.findById(userId)
.flatMap(user -> orderService.getOrdersForUser(userId)
.collectList()
.map(orders -> new UserOrders(user, orders))
);
}
Explanation:
Error Handling
Graceful error handling is crucial for resilience.
Example: Handling Errors with Fallback
public Mono<Product> getProduct(String productId) {
return productService.getProductById(productId)
.onErrorResume(e -> {
// Log the error and return a default product
log.error("Error fetching product", e);
return Mono.just(new Product("default", "Default Product"));
});
}
Explanation:
Managing Backpressure
Control the flow of data to prevent overwhelming your application.
Example: Processing Items with Rate Limiting
public Flux<Item> processItems() {
return itemService.getItems()
.limitRate(50)
.flatMap(this::processItem);
}
Explanation:
Combining Streams
Merge multiple data sources effortlessly.
Example: Combining Data from Two Services
public Mono<UserProductInfo> getUserProductInfo(String userId, String productId) {
Mono<User> userMono = userService.getUserById(userId);
Mono<Product> productMono = productService.getProductById(productId);
return Mono.zip(userMono, productMono)
.map(tuple -> new UserProductInfo(tuple.getT1(), tuple.getT2()));
}
Explanation:
Parallel Processing
Enhance performance by processing independent tasks concurrently.
Example: Processing Orders in Parallel
public Flux<OrderResult> processOrders() {
return orderService.getAllOrders()
.parallel()
.runOn(Schedulers.parallel())
.flatMap(this::processOrder)
.sequential();
}
Explanation:
Example: Building a Reactive Service
To solidify our understanding, let's build a reactive service that fetches user information, retrieves their associated orders, processes each order in parallel, and handles potential errors gracefully.
File: UserOrderService.java
package com.example.reactive.service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Collections;
import java.util.List;
public class UserOrderService {
private final UserService userService;
private final OrderService orderService;
private final PaymentService paymentService;
public UserOrderService(UserService userService, OrderService orderService, PaymentService paymentService) {
this.userService = userService;
this.orderService = orderService;
this.paymentService = paymentService;
}
public Mono<UserOrderSummary> getUserOrderSummary(String userId) {
return userService.getUserById(userId)
.flatMap(user -> orderService.getOrdersForUser(userId)
.collectList()
.map(orders -> new UserOrders(user, orders)))
.flatMap(userOrders -> processOrders(userOrders.getOrders())
.collectList()
.map(processedOrders -> new UserOrderSummary(userOrders.getUser(), processedOrders)))
.onErrorResume(e -> {
// Log the error and return a default summary
System.err.println("Failed to fetch user order summary: " + e.getMessage());
return Mono.just(new UserOrderSummary(new User("unknown", "Unknown User"), Collections.emptyList()));
});
}
private Flux<OrderResult> processOrders(List<Order> orders) {
return Flux.fromIterable(orders)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(order -> paymentService.processPayment(order)
.map(paymentResult -> new OrderResult(order, paymentResult))
.onErrorResume(e -> {
// Log the error and provide a fallback
System.err.println("Payment processing failed for order " + order.getId() + ": " + e.getMessage());
return Mono.just(new OrderResult(order, PaymentResult.failed()));
}))
.sequential();
}
}
Explanation:
Key reactive operators used in the UserOrderService class:
flatMap:
map:
collectList:
onErrorResume:
parallel:
runOn:
sequential:
These operators, combined, enable chaining of asynchronous operations, parallel processing, error handling, and data transformation, which are key to building efficient, resilient, and non-blocking reactive applications.
Ever wondered how to handle errors gracefully in a complex chain of operations? Or how to prevent your application from being overwhelmed by data? Stay tuned for the next article, where we'll explore advanced techniques and best practices that will take your reactive programming skills to the next level.
Enjoying the journey? Don't forget to like and follow to catch the next installment!
Technology Strategist ? Enabling Businesses to Innovate and Transform ?? Ex-AWS ?? Ex-Etisalat ?? Founder
4 个月Looking forward to learning more about reactive programming techniques.