Series 2: Intermediate Concepts in Reactive Programming

Series 2: Intermediate Concepts in Reactive Programming

See these articles before you start


  1. Unlocking the Power of Reactive Programming in Java https://www.dhirubhai.net/pulse/unlocking-power-reactive-programming-java-puneet-tripathi-xhwef
  2. Series Part 1: Unveiling the Magic of Reactive Programming with Java and Spring Boot https://www.dhirubhai.net/pulse/series-part-1-unveiling-magic-reactive-programming-java-tripathi-7vcqf/


Introduction

Welcome back! , In the first article (Series Part 1) , I introduced the fundamentals of reactive programming. Now, we'll delve into intermediate concepts that will empower you to build more complex and efficient reactive applications.

Chaining Reactive Streams

Reactive programming allows you to chain multiple asynchronous operations seamlessly.

Example: Fetching User and Orders

Let's fetch a user's details and their orders, then combine the results.

public Mono<UserOrders> getUserOrders(String userId) {
    return userRepository.findById(userId)
        .flatMap(user -> orderService.getOrdersForUser(userId)
            .collectList()
            .map(orders -> new UserOrders(user, orders))
        );
}
        

Explanation:

  • flatMap: Used to chain asynchronous operations that return a Mono or Flux.
  • collectList: Converts a Flux<Order> into a Mono<List<Order>>.
  • map: Transforms the data synchronously.

Error Handling

Graceful error handling is crucial for resilience.

Example: Handling Errors with Fallback

public Mono<Product> getProduct(String productId) {
    return productService.getProductById(productId)
        .onErrorResume(e -> {
            // Log the error and return a default product
            log.error("Error fetching product", e);
            return Mono.just(new Product("default", "Default Product"));
        });
}
        

Explanation:

  • onErrorResume: Catches exceptions and provides a fallback Mono.

Managing Backpressure

Control the flow of data to prevent overwhelming your application.

Example: Processing Items with Rate Limiting

public Flux<Item> processItems() {
    return itemService.getItems()
        .limitRate(50)
        .flatMap(this::processItem);
}
        

Explanation:

  • limitRate(50): Processes 50 items at a time.
  • Prevents resource exhaustion by controlling the data consumption rate.

Combining Streams

Merge multiple data sources effortlessly.

Example: Combining Data from Two Services

public Mono<UserProductInfo> getUserProductInfo(String userId, String productId) {
    Mono<User> userMono = userService.getUserById(userId);
    Mono<Product> productMono = productService.getProductById(productId);

    return Mono.zip(userMono, productMono)
        .map(tuple -> new UserProductInfo(tuple.getT1(), tuple.getT2()));
}
        

Explanation:

  • Mono.zip: Combines multiple Mono instances and waits for all to emit.
  • tuple.getT1() and tuple.getT2(): Retrieve the results from the zipped Monos.

Parallel Processing

Enhance performance by processing independent tasks concurrently.

Example: Processing Orders in Parallel

public Flux<OrderResult> processOrders() {
    return orderService.getAllOrders()
        .parallel()
        .runOn(Schedulers.parallel())
        .flatMap(this::processOrder)
        .sequential();
}
        

Explanation:

  • parallel(): Splits the Flux into multiple rails for parallel processing.
  • runOn(Schedulers.parallel()): Specifies the scheduler to use.
  • sequential(): Merges the parallel streams back into a single Flux.


Example: Building a Reactive Service

To solidify our understanding, let's build a reactive service that fetches user information, retrieves their associated orders, processes each order in parallel, and handles potential errors gracefully.

File: UserOrderService.java

package com.example.reactive.service;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Collections;
import java.util.List;

public class UserOrderService {

    private final UserService userService;
    private final OrderService orderService;
    private final PaymentService paymentService;

public UserOrderService(UserService userService, OrderService orderService, PaymentService paymentService) {
        this.userService = userService;
        this.orderService = orderService;
        this.paymentService = paymentService;
    }

public Mono<UserOrderSummary> getUserOrderSummary(String userId) {
        return userService.getUserById(userId)
            .flatMap(user -> orderService.getOrdersForUser(userId)
                .collectList()
                .map(orders -> new UserOrders(user, orders)))
            .flatMap(userOrders -> processOrders(userOrders.getOrders())
                .collectList()
                .map(processedOrders -> new UserOrderSummary(userOrders.getUser(), processedOrders)))
            .onErrorResume(e -> {
                // Log the error and return a default summary
                System.err.println("Failed to fetch user order summary: " + e.getMessage());
                return Mono.just(new UserOrderSummary(new User("unknown", "Unknown User"), Collections.emptyList()));
            });
    }

private Flux<OrderResult> processOrders(List<Order> orders) {
        return Flux.fromIterable(orders)
            .parallel()
            .runOn(Schedulers.parallel())
            .flatMap(order -> paymentService.processPayment(order)
                .map(paymentResult -> new OrderResult(order, paymentResult))
                .onErrorResume(e -> {
                    // Log the error and provide a fallback
                    System.err.println("Payment processing failed for order " + order.getId() + ": " + e.getMessage());
                    return Mono.just(new OrderResult(order, PaymentResult.failed()));
                }))
            .sequential();
    }
}        

Explanation:

  • Chaining Reactive Streams: We start by fetching the user using userService.getUserById(userId), then retrieve their orders with orderService.getOrdersForUser(userId), chaining these operations with flatMap and map.
  • Error Handling: We use onErrorResume to catch any exceptions that occur during the process, logging the error and providing a fallback UserOrderSummary with default values.
  • Parallel Processing: The processOrders method processes each order in parallel using parallel(), runOn(Schedulers.parallel()), and flatMap.
  • Backpressure Management: By controlling the rate at which orders are processed in processOrders, we prevent overwhelming the paymentService.
  • Combining Streams: The processed orders are collected back into a UserOrderSummary, combining the user information with the list of processed orders.


Key reactive operators used in the UserOrderService class:

flatMap:

  • Purpose: Chains asynchronous operations that return a Mono or Flux.
  • Usage: Used to fetch orders after retrieving the user and to process each order in processOrders.
  • How It Works: flatMap takes each item emitted by the source and applies a function that returns another Publisher, then flattens and emits the results from all these inner publishers in a single stream.

map:

  • Purpose: Transforms the data within a Mono or Flux synchronously.
  • Usage: Used to create UserOrders by combining User and List<Order> and later to create UserOrderSummary by combining User and List<OrderResult>.
  • How It Works: map applies a function to each item emitted, transforming the data without changing the type of the reactive stream (e.g., Mono<T> remains Mono<T>).

collectList:

  • Purpose: Collects all items emitted by a Flux into a List, emitting that list as a single Mono.
  • Usage: Used to aggregate all Order and OrderResult items into lists for further processing.
  • How It Works: Once the Flux completes, collectList gathers all items emitted into a single List and wraps it in a Mono.

onErrorResume:

  • Purpose: Provides fallback behavior in case of errors in the reactive stream.
  • Usage: Applied at two points in the code: one at the global level to handle errors for the entire getUserOrderSummary chain and another within processOrders to handle errors specific to each order processing.
  • How It Works: When an error is emitted, onErrorResume intercepts it and switches to an alternative Mono or Flux, allowing the stream to continue.

parallel:

  • Purpose: Splits the Flux into multiple parallel “rails” (streams) for concurrent processing.
  • Usage: Used in processOrders to enable parallel processing of each order.
  • How It Works: parallel divides the Flux stream into a number of parallel lanes, where each lane can process items concurrently.

runOn:

  • Purpose: Specifies the scheduler (thread pool) for executing the parallelized Flux.
  • Usage: Used in processOrders with Schedulers.parallel() to designate a thread pool for CPU-intensive or parallel tasks.
  • How It Works: When used after parallel, runOn assigns a scheduler to each rail, determining which threads will handle the parallel execution.

sequential:

  • Purpose: Merges parallel streams back into a single, ordered Flux.
  • Usage: Used at the end of processOrders to merge the parallel results back into a sequential stream.
  • How It Works: sequential takes items from all parallel rails and merges them into a single Flux, preserving the order of the original stream.


These operators, combined, enable chaining of asynchronous operations, parallel processing, error handling, and data transformation, which are key to building efficient, resilient, and non-blocking reactive applications.


Ever wondered how to handle errors gracefully in a complex chain of operations? Or how to prevent your application from being overwhelmed by data? Stay tuned for the next article, where we'll explore advanced techniques and best practices that will take your reactive programming skills to the next level.

Enjoying the journey? Don't forget to like and follow to catch the next installment!

Farrukh Anwaar

Technology Strategist ? Enabling Businesses to Innovate and Transform ?? Ex-AWS ?? Ex-Etisalat ?? Founder

4 个月

Looking forward to learning more about reactive programming techniques.

要查看或添加评论,请登录

Puneet Tripathi的更多文章