?? How to Use Stream.gather in Java 24 for More Powerful Stream Processing

?? How to Use Stream.gather in Java 24 for More Powerful Stream Processing

While traveling to Belo Horizonte — the capital of Minas Gerais, Brazil — to catch The Offspring live (one of my all-time favorite bands, by the way), I found myself thinking about my next blog post. Just last week, I introduced some Java 24 changes to a large audience — even though Java 24 hasn’t officially launched yet (it’s set for March 18, 2025). That’s when I had an insight: developers can leverage Stream.gather ?? to write cleaner, more efficient stream-based code—elevating their knowledge and staying ahead of the launch.

One of the major highlights in Java 24, Stream.gather is a powerful feature that enhances stream processing by allowing custom intermediate operations. This article provides an easy-to-follow guide on how Stream.gather works, the problems it solves, and how to use it effectively.

If you want to start using Stream.gather today, you can either download the latest release candidate for JDK 24 from jdk.java.net/24 or enable preview features by passing --enable-preview to javac while compiling and javawhile executing with JDK 23.

?? How It Works

Stream.gather introduces gatherers—user-defined processors that transform stream elements in flexible ways. Unlike traditional intermediate operations, gatherers can perform:

  • ?? One-to-one transformations (like map)
  • ?? One-to-many, many-to-many, and many-to-one aggregations
  • ?? Stateful transformations that track previous elements
  • ? Short-circuiting operations to transform infinite streams into finite ones

A gatherer consists of four optional functions:

  • ??? Initializer — Creates an intermediate state object (if needed).
  • ?? Integrator — Processes incoming elements, possibly using intermediate state, and optionally produces output elements.
  • ? Combiner — Merges intermediate states into one.
  • ?? Finisher — Finalizes processing by handling the final intermediate state and performing a final action at the end of the input stream.

?? Which Problem It Can Solve

Java streams have been great for functional programming, but they lacked a way to define custom intermediate operations. Stream.gather fills this gap, making complex tasks like grouping, scanning, or stateful filtering more intuitive and efficient.

For example, imagine you have a list of students and want to split them into groups of three. Before Java 24, you would do it like this:

import java.util.stream.IntStream;

public class BeforeJava24 {
    private record Student(String name) {}

    public static void main(String[] args) {
        var batchSize = 3;
        var students = IntStream.rangeClosed(1, 10)
                .mapToObj(i -> new Student(String.format("Student #%d", i)))
                .toList();

        var groups = IntStream
                .range(0, students.size() % batchSize == 0 ? students.size() / batchSize : students.size() / batchSize + 1)
                .mapToObj(i -> students.subList(i * batchSize, Math.min((i + 1) * batchSize, students.size())))
                .toList();

        groups.forEach(System.out::println);
    }
}        

With Java 24, you can achieve the same result more cleanly and efficiently using Stream.gather:

import java.util.stream.Gatherers;
import java.util.stream.IntStream;

public class AfterJava24 {
    private record Student(String name) {}

    public static void main(String[] args) {
        var students = IntStream.rangeClosed(1, 10)
                .mapToObj(i -> new Student(String.format("Student #%d", i)))
                .toList();

        var groups = students.stream()
                .gather(Gatherers.windowFixed(3))
                .toList();

        groups.forEach(System.out::println);
    }
}        

Notice how the code becomes cleaner and less complex with Stream.gather. ??

??? Built-in Gatherers

Java 24 provides several ready-to-use gatherers in java.util.stream.Gatherers:

  • ?? fold is a stateful many-to-one gatherer which performs an ordered reduction transformation.

Before Java 24, you would probably do it as follows:

import java.util.Arrays;

public class Fold {
    public static void main(String[] args) {
        var numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        var total = numbers.stream().mapToInt(Integer::intValue).sum();
        System.out.println(total);
    }
}        

Using fold in Java 24, the code looks like this:

import java.util.Arrays;
import java.util.stream.Gatherers;

public class Fold {
    public static void main(String[] args) {
        var numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // sums up all the integers in the list
        var total = numbers.stream()
                .gather(Gatherers.fold(() -> 0, Integer::sum)) // starts at zero and sums the previous number with current one
                .findFirst().orElse(0);
        System.out.println(total);
    }
}        

Check in the example above — there was no need to use map since gatherprocesses the stream pipeline and performs the sum in a single statement.

  • ?? mapConcurrent is a stateful one-to-one gatherer which applies functions concurrently with a concurrency limit.

In the example below, we have an implementation using Java 23:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;

public class MapConcurrent {
    public static void main(String[] args) {
        var urls = List.of(
                "https://jsonplaceholder.typicode.com/todos/1",
                "https://jsonplaceholder.typicode.com/todos/2",
                "https://jsonplaceholder.typicode.com/posts/1",
                "https://jsonplaceholder.typicode.com/users/1",
                "https://jsonplaceholder.typicode.com/comments/1"
        );

        try (HttpClient client = HttpClient.newHttpClient()) {
            urls.stream()
                    .map(url -> client.sendAsync(
                                    HttpRequest.newBuilder().uri(URI.create(url)).build(),
                                    HttpResponse.BodyHandlers.ofString()
                            ).thenApply(HttpResponse::body)
                            .exceptionally(e -> "Error: " + e.getMessage()))
                    .toList()
                    .forEach(response -> System.out.println(response.join())); // Wait and print
        }
    }
}        

With Java 24, we can simplify this task using mapConcurrent:

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Gatherers;

public class MapConcurrent {
    public static void main(String[] args) {
        var urls = List.of(
                "https://jsonplaceholder.typicode.com/todos/1",
                "https://jsonplaceholder.typicode.com/todos/2",
                "https://jsonplaceholder.typicode.com/posts/1",
                "https://jsonplaceholder.typicode.com/users/1",
                "https://jsonplaceholder.typicode.com/comments/1"
        );

        final Function<String, String> fetchData = url -> {
            try (HttpClient client = HttpClient.newHttpClient()) {
                var resp = client.send(
                        HttpRequest.newBuilder().uri(URI.create(url)).build(),
                        HttpResponse.BodyHandlers.ofString());
                return resp.body();
            } catch (IOException | InterruptedException ex) {
                return "";
            }
        };
        // fetches all the 5 url at once
        urls.stream().gather(Gatherers.mapConcurrent(urls.size(), fetchData))
                .toList()
                .forEach(System.out::println);
    }
}        

Look how the code is now simpler and easier to maintain.

  • ?? scan is a stateful one-to-one gatherer that applies a function using the current state and element to produce the next.

Here’s how we handled this before Java 24 in an example where we calculate the interest of each installment in a financing:

import java.util.stream.IntStream;

public class Scan {
    public static void main(String[] args) {
        double principal = 10000; // Loan Amount
        double annualInterestRate = 12; // 12% per year
        int numInstallments = 12; // 12 months

        // Convert annual interest rate to monthly interest rate
        double monthlyInterestRate = (annualInterestRate / 100) / 12;
        double emi = (principal * monthlyInterestRate * Math.pow(1 + monthlyInterestRate, numInstallments)) /
                (Math.pow(1 + monthlyInterestRate, numInstallments) - 1);

        System.out.println("Reducing Balance Installment Schedule:");

        // Mutable array to track outstanding balance
        double[] balance = {principal};

        IntStream.rangeClosed(1, numInstallments)
                .forEach(i -> {
                    double interest = balance[0] * monthlyInterestRate;
                    double principalRepayment = emi - interest;
                    balance[0] -= principalRepayment;  // Reduce principal balance

                    System.out.printf("Month %d: Installment = %.2f, Interest = %.2f, Principal = %.2f, Remaining Balance = %.2f\n",
                            i, emi, interest, principalRepayment, balance[0]);
                });
    }
}        

And here’s how it looks in Java 24:

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Gatherers;
import java.util.stream.IntStream;

public class Scan {
    public static void main(String[] args) {
        final double principal = 10000; // Loan Amount
        final double annualInterestRate = 12; // 12% per year
        final int numInstallments = 12; // 12 months

        // Convert annual interest rate to monthly interest rate
        final double monthlyInterestRate = (annualInterestRate / 100) / 12;
        final double emi = (principal * monthlyInterestRate * Math.pow(1 + monthlyInterestRate, numInstallments)) /
                (Math.pow(1 + monthlyInterestRate, numInstallments) - 1);
        final double initialInterest = principal * monthlyInterestRate;
        
        System.out.println("Reducing Balance Installment Schedule:");

        IntStream.rangeClosed(1, numInstallments)
                .mapToObj(i -> new HashMap<String, Double>(Map.of("Month", (double) i)))
                .gather(Gatherers.scan(() -> Map.of("Balance", principal, "Interest", initialInterest, "PrincipalRepayment", emi - initialInterest),
                        (current, downstream) -> {
                            double interest = current.get("Balance") * monthlyInterestRate;
                            double principalRepayment = emi - interest;
                            downstream.put("Balance", current.get("Balance") - principalRepayment);
                            downstream.put("Interest", interest);
                            downstream.put("PrincipalRepayment", emi - interest);
                            return downstream;
                        }))
                .forEach(entry -> {
                    System.out.printf("Month %d: Installment = %.2f, Interest = %.2f, Principal = %.2f, Remaining Balance = %.2f\n",
                            entry.get("Month").intValue(), emi, entry.get("Interest"), entry.get("PrincipalRepayment"), entry.get("Balance"));
                });
    }
}        

See that we no longer need an external mutable value to track the state — Gatherers.scan can maintain the state between iterations, allowing you to reuse the previous value to generate a new one. What do you think? The code is fancier now, isn't it? ??

  • ?? windowFixed groups elements into fixed-size lists.

Imagine you need to process orders in batches of five orders each. To break them into groups, you would probably do something like this before Java 24:

import java.util.stream.IntStream;


public class WindowFixed {
    record Order(int orderId) {}

    public static void main(String[] args) {
        final int batchSize = 5;
        var orders = IntStream.rangeClosed(1, 51)
                .mapToObj(Order::new)
                .toList();

        var batches = IntStream
                .range(0, orders.size() % batchSize == 0 ? orders.size() / batchSize : orders.size() / batchSize + 1)
                .mapToObj(i -> orders.subList(i * batchSize, Math.min((i + 1) * batchSize, orders.size())))
                .toList();
        batches.forEach(System.out::println);
    }
}        

In Java 24, you can simply use windowFixed with the batch size, and it will look like this:

import java.util.stream.Gatherers;
import java.util.stream.IntStream;

public class WindowFixed {
    record Order(int orderId) {}

    public static void main(String[] args) {
        final int batchSize = 5;
        var orders = IntStream.rangeClosed(1, 51)
                .mapToObj(Order::new)
                .toList();
        var batches = orders.stream()
                .gather(Gatherers.windowFixed(batchSize))
                .toList();
        batches.forEach(System.out::println);
    }
}        

Now it looks simpler and easier to read.

  • ?? windowSliding is similar to windowFixed, but with overlapping groups.

Moving averages are a great example of where we can use a sliding window. Here's how it was done in Java prior to 24:

import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class WindowSliding {
    public static void main(String[] args) {
        var data = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0);
        int windowSize = 3;
        var movingAverages = IntStream.rangeClosed(0, data.size() - windowSize)
                .mapToObj(i -> data.subList(i, i + windowSize)
                        .stream()
                        .mapToDouble(Double::doubleValue)
                        .average()
                        .orElse(0.0))
                .collect(Collectors.toList());
        System.out.println(movingAverages);
    }
}        

With Java 24, we can use windowSliding to reduce the complexity of the code above:

import java.util.Arrays;
import java.util.stream.Gatherers;

public class WindowSliding {
    public static void main(String[] args) {
        var data = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0);
        int windowSize = 3;
        var movingAverages = data.stream()
                .gather(Gatherers.windowSliding(windowSize))
                .gather(Gatherers.scan(() -> 0.0,
                        (_, windows) -> windows.stream().mapToDouble(Double::doubleValue)
                                .average().orElse(0.0)))
                .toList();
        System.out.println(movingAverages);
    }
}        

In the end, we see that we don't need to use map or collect to get the same results.

? Parallel Processing with Gatherers

Parallel execution in Stream.gather operates in two modes:

  1. Without a Combiner — The upstream and downstream run concurrently, similar to parallel().forEachOrdered().
  2. With a Combiner — Supports parallel reductions, akin to parallel().reduce().

Example using a parallel gatherer to get the maximum prime number between 1 and 1000:

import java.util.Objects;
import java.util.Optional;
import java.util.stream.Gatherer;
import java.util.stream.IntStream;

class LargestPrimeGatherer {

    public static void main(String[] args) {
        Optional<Integer> largestPrime = IntStream.rangeClosed(1, 10000)
                .boxed()
                .filter(LargestPrimeGatherer::isPrime)    // Filter only prime numbers
                .gather(selectOne(Math::max))             // Use custom Gatherer
                .parallel()
                .findFirst();                             // Extract the largest prime

        System.out.println("Largest prime number between 1 and 10,000: " +
                largestPrime.orElse(-1)); // Print result
    }

    // Custom Gatherer to find the largest prime number
    static Gatherer<Integer, ?, Integer> selectOne(java.util.function.BinaryOperator<Integer> selector) {
        Objects.requireNonNull(selector, "selector must not be null");

        // Private state to track information across elements
        class State {
            Integer value = null;  // The current best value
        }

        return Gatherer.of(
                State::new,  // The initializer creates a new State instance

                // The integrator
                Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                    if (state.value == null) {
                        state.value = element;  // First value
                    } else {
                        state.value = selector.apply(state.value, element); // Compare and update max
                    }
                    return true;
                }),

                // The combiner, used during parallel evaluation
                (leftState, rightState) -> {
                    if (leftState.value == null) return rightState;  // If left is empty, take right
                    if (rightState.value == null) return leftState;  // If right is empty, take left
                    leftState.value = selector.apply(leftState.value, rightState.value);  // Select max
                    return leftState;
                },

                // The finisher
                (state, downstream) -> {
                    if (state.value != null)
                        downstream.push(state.value);  // Emit the selected value
                }
        );
    }

    // Prime checking function
    private static boolean isPrime(int num) {
        if (num < 2) return false;
        return IntStream.rangeClosed(2, (int) Math.sqrt(num))
                .noneMatch(divisor -> num % divisor == 0);
    }
}        

Notice that in this example, we have a more complex implementation where we define a Gatherer interface with an integrator, combiner, and finisher. We also implement a function that returns a gatherer and accepts an operator to select the maximum value. In this case, we use Math.max, which finds the maximum between two numbers.

??? Creating Your Own Gatherer

Developers can define custom gatherers using Gatherer.ofSequential() or by implementing Gatherer directly, as shown in the example above using Gatherer.of(). Here's an example of a gatherer that emits distinct names based on their length. If two names have the same length, it picks only the first one that appears:

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Gatherer;

public class Test {
    public static void main(String[] args) {
        Gatherer<String, Set<Integer>, String> distinctByLength = Gatherer.ofSequential(
                HashSet::new,
                (set, str, downstream) -> {
                    // If this length is new, send 'str' downstream
                    if (set.add(str.length())) {
                        downstream.push(str);
                    }
                    return true;
                },
                (set, downstream) -> {}
        );

        var names = Arrays.asList("amanda", "samantha", "carolina", "davis", "john", "juliana");

        names.stream()
                // "gather" is a proposed method in JEP 485 (not in standard Java yet)
                .gather(distinctByLength)
                .collect(Collectors.toSet())
                .forEach(System.out::println);
    }
}        

?? Gather vs. Collect

While Collector is used for terminal aggregation, Gatherer is designed for intermediate transformations. Key differences:

  • ?? Integrator vs. BiConsumer — Gatherers integrate state with a Downstream object.
  • ?? Finisher with Side Effects — Unlike Collector’s Function, Gatherer’s BiConsumer operates on Downstream.
  • ? Supports Short-Circuiting — Gatherers can stop processing early, unlike most collectors.

?? Conclusion

Java 24’s Stream.gather makes stream processing more powerful and expressive. Whether using built-in gatherers or creating custom ones, developers now have a tool that simplifies complex transformations while maintaining readability and efficiency. This feature represents a significant evolution in Java’s functional programming capabilities, bridging the gap between simplicity and flexibility. ??

要查看或添加评论,请登录

Thiago G.的更多文章

社区洞察