Reactive Streams specification in Java
Introduction
Reactive Streams is a cross-platform specification for processing a potentially infinite sequence of events across asynchronous boundaries (threads, processes, or network-connected computers) with non-blocking backpressure. A reactive stream contains a publisher that sends forward?data,?error,?completion?events, and subscribers that send backward?request?and?cancel?backpressure events. There can also be intermediate processors between the publisher and the subscriber that filter or transform events.
Backpressure is application-level flow control from the subscriber to the publisher to control the sending rate.
The Reactive Streams specification is designed to efficiently process (in terms of CPU and memory usage) time-ordered sequences of events. For efficient CPU usage, the specification describes the contracts for asynchronous and non-blocking events processing in different stages (producers, processors, consumers). For efficient memory usage, the specification describes the contracts for switching between?push?and?pull?communication models based on the events processing rate, which avoids using unbounded buffers.
Problems and solutions
When designing systems to transfer items from a producer to a consumer, the goal is to send them with minimal latency and maximum throughput. It is obvious that to do this, items must be sent from the producer to the consumer at the maximum rate at which the consumer can process them.
Latency is the time between sending an item from the producer and its receiving by the consumer.
Throughput is the number of items sent from producer to consumer per unit of time.
However, the producer and the consumer may have limitations that can prevent the system from achieving the best performance:
There are several patterns for sequential item processing, that solve some or most of the above limitations:
These patterns fall into two groups: synchronous?pull?communication models (in which the consumer determines when to receive items from the producer) and asynchronous?push?communication models (in which the producer determines when to send items to the consumer).
Iterator
In the Iterator pattern, the consumer synchronously?pulls?items from the producer one by one. The producer sends an item only when the consumer requests it. If the producer does not have an item at the time of the request, he sends an empty response.
Pros:
Cons:
When using the Iterator pattern, which transfers items one at a time, latency and throughput are often unsatisfactory. To improve these parameters with minimal changes, the same Iterator pattern can transfer items in batches rather than one at a time.
Pros:
Cons:
Observer
In the Observer pattern, one or many consumers subscribe to the producer’s events. The producer asynchronously?pushes?events to all subscribed consumers as soon as it generates them. The consumer can unsubscribe from the producer if it does not need further events.
Pros:
Cons:
Reactive Extensions
Reactive Extensions (ReactiveX) is a family of multi-platform frameworks for handling synchronous or asynchronous event streams, originally created by Erik Meijer at Microsoft. The implementation of Reactive Extensions for Java is the Netflix RxJava framework.
In simplified terms, Reactive Extensions are a combination of the Observer and Iterator patterns and functional programming. From the Observer pattern, they took the consumer’s ability to subscribe to the producer’s events. From the Iterator pattern, they took the ability to handle event streams of three types (data, error, completion). From functional programming, they took the ability to handle event streams with chained methods (filter, transform, combine, etc.).
Pros:
Cons:
Reactive Streams
Reactive Streams are a further development of Reactive Extensions, which use backpressure to match producer and consumer performance. In simplified terms, Reactive Streams are a combination of Reactive Extensions and batching.
The main difference between them is who is the initiator of the exchange. In Reactive Extensions, a publisher sends events to a subscriber as soon as they become available and in any number. In Reactive Streams, a publisher must send events to a subscriber only after they have been requested and no more than the requested number.
Pros:
Cons:
Backpressure
There are several solutions for the problem where a producer generates events faster than a consumer processes them. This does not happen in?pull?communication models because the consumer initiates the exchange. In?push?communication models, the producer cannot usually determine the sending rate, so the consumer may eventually receive more events than it can process. Backpressure is a solution to this problem by informing the producer about the processing rate of its consumers.
Without the use of backpressure, the consumer has a few solutions to deal with excessive events:
领英推荐
Any solution that includes dropping events on the consumer may be inefficient because these events still require I/O operations to send them from the producer.
The backpressure in Reactive Streams is implemented as follows. To start receiving events from the producer, the consumer?pulls?the number of items it wants to receive. Only then does the producer?push?events to the consumer; the producer never sends them on its initiative. After the consumer has processed all the requested events, the whole cycle repeats. In a particular case, if the consumer is known to be faster than the producer, it can work in the?push?communication model and request all items immediately after subscribing. Or vice versa, if the consumer is known to be slower than the producer, it can work in the?pull?communication model and request the next items only after the previous ones have been processed. Thus, the model in which reactive streams operate can be described as a?dynamic push/pull?communication model. It works effectively if the producer is faster or slower than the consumer or even when that ratio can change over time.
With the use of backpressure, the producer has much more solutions to deal with excessive events:
Which solutions to use for a particular reactive stream depends on the nature of the events. But backpressure is not a?silver bullet. It simply shifts the problem of performance mismatch to the producer’s side, where it is supposed to be easier to solve. However, in some cases, there are better solutions than backpressure, such as simply dropping excessive events on the consumer’s side.
The Reactive Streams specification
Reactive Streams is a?specification?to provide a standard for asynchronous stream processing with non-blocking backpressure for various runtime environments (JVM, .NET, and JavaScript) and network protocols. The Reactive Streams specification was created by engineers from Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter, and others.
The specification describes the concept of?reactive streams?that have the following features:
The Reactive Streams?specification for the JVM?(the latest version 1.0.4 was released on May 26th, 2022) contains the textual specification and the Java API, which contains four interfaces that must be implemented according to this specification. It also includes the Technology Compatibility Kit (TCK), a standard test suite for conformance testing of implementations.
It is important to note that the Reactive Streams specification was created?after?several mature but incompatible implementations of Reactive Streams already existed. Therefore, the specification is currently limited and contains only low-level APIs. Application developers should use this specification to provide interoperability between existing implementations. To have high-level functional APIs (filter, transform, combine, etc.), application developers should use implementations of this specification (Lightbend Akka Streams, Pivotal Project Reactor, Netflix RxJava, etc.) through their native APIs.
The Reactive Streams API
The Reactive Streams API consists of four interfaces, which are located in the?org.reactivestreams?package:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T item);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Publisher
The Publisher interface represents a producer of potentially infinite sequenced data and control events. A Publisher produces events according to the demand received from one or many Subscribers.
Demand is the aggregated number of items requested by a Subscriber that have not yet been delivered by the Publisher.
Publishers may vary about whether Subscribers receive events that were produced before they subscribed.?Cold?publishers can be repeated and do not start until they are subscribed (in-memory iterators, file readings, database queries).?Hot?publishers cannot be repeated and start immediately, regardless of the presence of subscribers (keyboard and mouse events, sensor events, network requests).
This interface has the following method:
Subscriber
The Subscriber interface represents a consumer of events. Multiple Subscribers can subscribe to and unsubscribe from a Producer at different times.
This interface has the following methods:
Subscription
The Subscription interface represents a connection between a Publisher and a Subscriber. Through a Subscription, the Subscriber can request items from the Publisher or cancel the connection.
This interface has the following methods:
Processor
The Processor interface represents a processing stage that extends the Subscriber and Publisher interfaces and is subject to the contracts of both. It acts as a Subscriber for the previous stage of a reactive stream and as a publisher for the next one.
The Reactive Streams workflow
The Reactive Streams workflow consists of three steps: establishing a connection, exchanging data and control events, and successfully or exceptionally terminating the connection.
When a Subscriber wants to start receiving events from a Publisher, it calls the?Publisher.subscribe(Subscriber)?method. If the Publisher accepts the request, it creates a new Subscription instance and invokes the?Subscriber.onSubscribe(Subscription)?method. If the Publisher rejects the request or otherwise fails, it invokes the?Subscriber.onError(Throwable)?method.
Once the Publisher and the Subscriber establish a connection with each other through the Subscription instance, the Subscriber can request events, and the Publisher can send them. When the Subscriber wants to receive events, it calls the?Subscription#request(long)?method with the number of items requested. Typically, the first such call occurs in the?Subscriber.onSubscribe(Subscription)?method. The Publisher sends each requested item by calling the?Subscriber.onNext(T)?method only in response to a previous request. A Publisher can send fewer events than requested if the reactive stream ends, but then must call either the?Subscriber.onComplete()?or?Subscriber.onError(Throwable)?methods.
If the Subscriber wants to stop receiving events, it calls the?Subscription.cancel()?method. After calling this method, the Subscriber can continue to receive events to meet the previously requested demand. A canceled Subscription does not receive?Subscriber.onComplete()?or?Subscriber.onError(Throwable)?events.
When there are no more events, the Publisher completes the Subscription successfully by calling the?Subscriber.onCompleted()?method. When an unrecoverable exception occurs in the Publisher, it completes the Subscription exceptionally by calling the?Subscriber.onError(Throwable)?method. After invocation of?Subscriber.onComplete()?or?Subscriber.onError(Throwable)?events, the current Subscription will not send any other events to the Subscriber.
The JDK Flow API
The JDK has supported the Reactive Streams specification since version 9 in the form of the Flow API. The?Flow?class contains nested static interfaces Publisher, Subscriber, Subscription, Processor, which are 100% semantically equivalent to their respective Reactive Streams counterparts. The Reactive Streams specification contains the?FlowAdapters?class, which is a bridge between the Reactive Streams API (the?org.reactivestreams?package) and the JDK Flow API (the?java.util.concurrent.Flow?class). The only implementation of the Reactive Streams specification that JDK provides so far is the?SubmissionPublisher?class that implements the Publisher interface.
Code examples
Cold synchronous reactive stream
This?document?describes the implementation of a synchronous Producer, a synchronous Consumer, and a?cold?reactive stream created from them.
Cold asynchronous reactive stream
This?document?describes the implementation of an asynchronous Producer, an asynchronous Consumer, and a?cold?reactive stream created from them.
Hot asynchronous reactive stream
This?document?describes the implementation of an asynchronous Producer and an asynchronous Processor extending the SubmissionPublisher class and a?hot?reactive stream created from them.
Conclusion
Before Reactive Streams appeared in the JDK, there were related CompletableFuture and Stream APIs. The CompletableFuture API uses the?push?communication model but supports asynchronous computations of a single value. The Stream API supports synchronous or asynchronous computations of multiple values but uses the?pull?communication model. Reactive Streams have taken a vacant place and support synchronous or asynchronous computations of multiple values and can also dynamically switch between the?push?and?pull?computations models. Therefore, Reactive Streams are suitable for processing sequences of events with unpredictable rates, such as mouse and keyboard events, sensor events, and latency-bound I/O events from a file or network.
Crucially, application developers should not implement the interfaces of the Reactive Streams specification themselves. First, the specification is complex enough, especially in asynchronous contracts, and cannot be easily implemented correctly. Second, the specification does not contain APIs for intermediate stream operations. Instead, application developers should implement the reactive stream stages (producers, processors, consumers) using existing frameworks (Lightbend Akka Streams, Pivotal Project Reactor, Netflix RxJava) with their much richer native APIs. They should use the Reactive Streams API only to combine heterogeneous stages into a single reactive stream.
Complete code examples are available in the?GitHub repository