Introduction to Pub-Sub and Streams with Redis&SpringBoot
Aneshka Goyal
AWS Certified Solutions Architect | Software Development Engineer III at Egencia, An American Express Global Business Travel Company
Publish/Subscribe
Problem: Let's say we have synchronous messaging between two components of our system called as service A(sender) and service B(receiver). Service A gets a request from Service B and sends a message to service B and is blocked to get acknowledgement thus cannot serve another receiver say Service C.
Solution: The Pub/Sub (Publisher/Subscriber) model is a messaging pattern used in software architecture to facilitate asynchronous communication between different components or systems. In this model, publishers produce messages that are then consumed by subscribers. The publishers publish messages without any knowledge of consumers(subscribers) and consumers consume these messages without being aware of who is the producer. The main idea behind this pattern is to achieve decoupled components in a system.
Since neither of the two main entities (publishers or subscribers) are aware of each other, message brokers play a crucial role in routing messages.
The key areas of Pub/Sub model includes -
A Subscription represents a connection between a subscriber and a topic. Subscriptions define which messages a subscriber will receive based on the topics to which it is subscribed. Subscriptions can have different configurations, such as message delivery guarantees (e.g., at-most-once, at-least-once) and acknowledgment mechanisms.
Some examples for this pattern implementers are Redis, RabbitMQ and Kafka (and there are many others).
Since in the title we are taking about implementing this pattern using Redis, Redis' Pub/Sub exhibits at-most-once message delivery semantics. As the name suggests, it means that a message will be delivered once if at all. Once the message is sent by the Redis server, there's no chance of it being sent again. If the subscriber is unable to handle the message (for example, due to an error or a network disconnect) the message is forever lost.
But Redis has streams to facilitate the at least once delivery. In this blog post we will explore both these flavours with Redis.
Redis Pub/Sub
Redis is the world’s fastest in-memory database. It provides cloud and on-prem solutions for caching, vector search, and NoSQL databases that seamlessly fit into any tech stack—making it simple for digital customers to build, scale, and deploy the fast apps our world runs on. Considering that Redis reads and writes data primarily on RAM, it naturally edges in speed when considered even for Pub/Sub.
Apart from being in memory database and leveraged as a cache, Redis also provides for capability of Pub/Sub pattern.
Since now we know about the publish/subscribe pattern, we also know about what is Redis, in order to see Redis pub/sub in action let's build a small application and drill into various key areas.
Hands On!
Redis Running Locally
First lets run a redis server locally for this we can install redis using brew, if running redis on windows either we can use docker image or follow the steps in here.
brew install redis
Once we have redis installed and service running we would be able to Ping the redis server and obtain the response i.e Pong
SpringBoot Application Setup
Next step we would be setting up or initialising a spring boot application that lets us create publishers, topics and subscriptions to these topics by subscribers.
We will use spring initializr to intitialize an application with maven for dependency management and Java version as 17. The pom file for the application looks like below.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>redisDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redisDemo</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
We would be leveraging spring-boot-starter-data-redis to work with redis and spring-boot-starter-web to expose some endpoints (just for demo purposes.)
Redis Configuration for Pub/Sub
First lets take a look at our application properties file,
spring.application.name=redisDemo
redis.data.host=127.0.0.1
redis.data.port=6379
redis.order.topic=order
Here we have defined the port and host to connect to redis (running locally), the name of topic where publishers will publish messages and subscribers can subscribe to this topic in order to receive messages.
Below is the RedisConfiguration.java that has the configurations for Redis connection, publishing and subscriptions. Let's take a look at the need for each of the beans we specified in this file.
@Configuration
public class RedisConfiguration {
@Value("${redis.order.topic}")
private String topic;
@Value("${redis.order.stream.key}")
private String streamKey;
@Value("${redis.data.host}")
private String redisHost;
@Value("${redis.data.port}")
private int redisPort;
@Bean
public LettuceConnectionFactory redisPreferredConnectionFactory() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort), LettuceClientConfiguration.builder().build());
}
@Bean
StringRedisTemplate stringRedisTemplate(@Qualifier("redisPreferredConnectionFactory") RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic(topic);
}
@Bean
public RedisMessageListenerContainer redisContainer(ChannelTopic topic, @Qualifier("redisPreferredConnectionFactory") RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListener(), topic);
container.addMessageListener(messageListenerPostProcessing(), topic);
container.addMessageListener(messageListenerPostProcessing(), PatternTopic.of("consumer-*"));
return container;
}
@Bean
public MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new OrderEventListener(), "listenToOrderUpdate");
}
@Bean
public MessageListenerAdapter messageListenerPostProcessing() {
return new MessageListenerAdapter(new OrderPostProcessingEventListener(), "listenToOrderUpdate");
}
}
Here first we use the LettuceConnectionFactory to build a connection with local redis.
There is another option for ConnectionFactory i.e Jedis.
LettuceConnectionFactory in comparison to JedisConnectionFactory:
Next we define a StringRedisTemplate. RedisTemplate offers a high-level abstraction for Redis interactions. While RedisConnection offers low-level methods that accept and return binary values (byte arrays), the template takes care of serialization and connection management, freeing the user from dealing with such details. In Redis everything is stored in binary so we can define the serializer to use(RedisTemplate uses a Java-based serializer for most of its operations.) or as here we are leveraging StringRedisTemplate it leverages a StringRedisSerializer internally.
Next we define a channel topic, its type of a topic, topic can either be a channel topic or a pattern topic, here bean is created for a channel topic while we also define a pattern topic as below.
consumer-*
Pattern topic allows subscriptions to topics that match a specific pattern.
Now we have configured the connection factory, leveraged it to provide for a RedisTemplate that producers can use to write messages to a topic. Lets see the configurations in place for a subscriber/subscription to work.
As for publishing for low level methods we have RedisConnection, similarly for subscriptions also RedisConnection provides for low level methods.
On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching.
At the low-level, RedisConnection offers the subscribe and pSubscribe methods that map the Redis commands for subscribing by channel or by pattern, respectively. Note that multiple channels or patterns can be used as arguments. To change the subscription of a connection or query whether it is listening, RedisConnection provides the methods getSubscription and isSubscribed methods.
Subscription commands in Spring Data Redis are blocking. That is, calling subscribe on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the subscription is canceled, which happens when another thread invokes unsubscribe or pUnsubscribe on the same connection. Message Listener Containers provide for a solution to this problem.
Message Listener Containers
Due to its blocking nature, low-level subscription(using RedisConnection) is not attractive, as it requires connection and thread management for every single listener. To alleviate this problem, Spring Data offers RedisMessageListenerContainer, which does all the heavy lifting. It is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).
RedisMessageListenerContainer acts as a message listener container. It is used to receive messages from a Redis channel and drive the MessageListener instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an Message Driven POJOs and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like.
Thus here we have created a RedisMessageListenerContainer with the connection factory and messageListenerAdaptors are injected to specify which topic a listener listens to and since its an adaptor, we can specify a different method name (following standard signature) to invoke when a message is received by the listener.
Redis Publisher
Let's now see the java code that publishes to redis topic(s).
@Component
public class OrderPublisher {
@Value("${redis.order.topic}")
private String topic;
private final StringRedisTemplate redisTemplate;
@Inject
public OrderPublisher(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void publishEvent(Order order) throws JsonProcessingException {
System.out.println(Thread.currentThread().getName());
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(order);
redisTemplate.convertAndSend(topic,msg);
redisTemplate.convertAndSend("consumer-order", msg);
}
}
Here we are sending the same message which is some order info to two topics, one named order and the other named as consumer-order.
We are using RedisTemplate(StringRedisTemplate we configured a step prior) convertAndSend method to send the message to a specific topic.
Let's take a look at the two subscribers we configured in the RedisConfiguration.
Redis Subscriber
public class OrderEventListener {
public void listenToOrderUpdate(String message, String topic) throws IOException {
System.out.println(Thread.currentThread().getName());
ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(message, Order.class);
System.out.println("got oder with id " + order.getId());
}
}
Here we print the thread name followed by the message content that we received.
There is another listener that is subscribed to the same channel topic order and also subscribed via pattern matching to consumer-order
consumer-*
public class OrderPostProcessingEventListener {
public void listenToOrderUpdate(String message, String topic) throws IOException {
System.out.println(Thread.currentThread().getName());
ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(message, Order.class);
System.out.println("OrderPostProcessingEventListener|got oder with id " + order.getId());
}
}
Here also we print thread name and order message details.
We have configured the publisher, subscriber and topics. We have redis message broker running locally, now we need to define the trigger to be able to publish these messages.
In this blog post we would be publishing messages via a Rest endpoint and also via terminal client i.e redis-cli.
领英推荐
@RestController
@RequestMapping("/v1/orders")
public class DemoController {
private final OrderPublisher publisher;
public DemoController(OrderPublisher publisher) {
this.publisher = publisher;
}
@PostMapping("")
public void publishOrderEvent(@RequestBody Order order) throws JsonProcessingException {
publisher.publishEvent(order);
}
}
Here we simply make a rest call which triggers the publisher to publish the order message to both the topics and thus we will have subscribers triggered once message is available.
Lets see the output when triggering the below API call.
https://localhost:8080/v1/orders
{"id":"23",
"status": "Processed"
}
Here as we can see 4 different thread names as printed first one for publisher logs, rest three are listener container threads as total 3 times the subscribers are invoked, one subscriber invoked twice as it subscribed to both topics order and consumer-order
Output when message is published via Redis Cli
127.0.0.1:6379> PUBLISH order "{\"id\":\"06\",\"status\":\"Processed\"}"
(integer) 1
127.0.0.1:6379>
Here we are publishing just one message to channel topic order via cli.
Since the message was published to just one topic, we invoked both the subscribers just once each and hence the the relevant output logs.
Redis pub/Sub key features:
Fire and Forget: Only delivers messages in real-time and does not retain any history. Once a message is published, it is not accessible to subscribers who missed it.
No Message acknowledgement: Does not support message acknowledgments. Every subscriber receives a copy of the message, but there's no mechanism for handling message processing or ensuring all subscribers receive and process each message correctly.
Redis pub/sub Use case:
When we require low-latency, real-time message distribution without the need for message retention or complex processing. Suitable for applications where instant delivery is more critical than message durability.
There comes up another use case where we want message history to be maintained with support for consumers to be able to consume messages when they are back online (basis on offset of last consumed message). It also shall help support replay of messages. This becomes an ideal scenario where message durability is critical, this leads us to redis streams.
Redis Streams
A Redis stream is a data structure that acts like an append-only log but also implements several operations to overcome some of the limits of a typical append-only log. These include random access in O(1) time and complex consumption strategies, such as consumer groups.
What is a Consumer Group?
A consumer group is a group of consumers that collectively read messages from the same stream. Each message in the stream is delivered to only one consumer within the group, allowing the workload to be distributed among multiple consumers.
Message Distribution:
Load Balancing: Within a consumer group, messages are distributed among consumers in a load-balanced manner. Each message is delivered to exactly one consumer in the group. This enables horizontal scaling, as we can add more consumers to handle a larger volume of messages.
Message Delivery Guarantees: Each message is processed by exactly one consumer, reducing the risk of duplicate processing and ensuring that each message is handled only once by the group.
Going on similar lines, lets now enhance the application we have so far to be able to publish to redis stream and subscribe to the stream for messages.
Redis Configuration for Streams
First we add the stream name to our application.properties file.
redis.order.stream.key=stream-order
Next let's enhance the redis configurations for stream subscription by creating a consumer group and attaching a consumer to it.
//-----------Redis Stream Configuration----------------------------------
@Bean
public Subscription subscription(StringRedisTemplate stringRedisTemplate) throws UnknownHostException {
createConsumerGroupIfNotExists(stringRedisTemplate, streamKey, streamKey);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
ObjectRecord<String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(100))
.targetType(String.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer
.create(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()), options);
Subscription subscription =
container.receive(Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
streamOffset, orderStreamListener(stringRedisTemplate));
container.start();
return subscription;
}
@Bean
public StreamListener<String, ObjectRecord<String, String>> orderStreamListener(StringRedisTemplate stringRedisTemplate) {
// handle message from stream
return new OrderStreamListener(stringRedisTemplate);
}
private void createConsumerGroupIfNotExists(StringRedisTemplate stringRedisTemplate,
String streamKey, String groupName){
try {
try {
stringRedisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0-0"),streamKey);
} catch (RedisSystemException exception) {
System.out.println(exception.getCause().getMessage());
}
}
catch (RedisSystemException ex){
System.out.println(ex.getMessage());
}
}
We do not need any explicit publisher configuration as we will leverage the StringRedisTemplate, here as well we have either the low-level RedisConnection or the high-level StreamOperations (which we will see while we take a look at Redis Stream Publisher)
For consumer we configured the options to poll the stream every 100 milliseconds. Here also we are not leveraging low level RedisConnection but StreamListenerContainer for high level abstraction and management of boiler plate code so that our consumer just focuses on business logic.
Note: Consumer group is created just once (when the stream is inialized for the first time).
Redis Stream Publisher
@Component
public class OrderStreamPublisher {
@Value("${redis.order.stream.key}")
private String key;
private final StringRedisTemplate redisTemplate;
@Inject
public OrderStreamPublisher(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void produce(Order order) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(order);
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject(msg)
.withStreamKey(key);
RecordId recordId = this.redisTemplate.opsForStream()
.add(record);
System.out.println(recordId);
}
}
The publisher leverages high level Redistemplate and stream records instead of low level RedisConnection to create an Object record and then add it to the stream.
Once the record is added to the stream successfully we get a unique identifier which is printed in the last statement.
Redis Stream Listener
public class OrderStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
@Value("${redis.order.stream.key")
private String key;
private final StringRedisTemplate redisTemplate;
public OrderStreamListener(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
String orderString = message.getValue();
ObjectMapper objectMapper = new ObjectMapper();
Order order = null;
try {
order = objectMapper.readValue(orderString, Order.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
System.out.println(order.getId());
redisTemplate.opsForStream().acknowledge(key, message);
}
}
This is the listener that is invoked when the listnerContainer gets a message from the stream. StreamListener's onMessage method is invoked specifically that holds the business logic, here to just log the message details.
Point to note: The last line is where the consumer acknowledges processing the message so that the offset can be advanced and thus this is how at-least-once message delivery is guaranteed. If a message is not acknowledged by the listener then it remains available for consumption by some other consumer in the same consumer group.
Next let's see things in action, just like with pub/sub here as well we will trigger an API call to trigger message addition to the stream.
We will enhance the controller as below, adding stream publisher and relevant endpoint for the same.
@RestController
@RequestMapping("/v1/orders")
public class DemoController {
private final OrderPublisher publisher;
private final OrderStreamPublisher orderStreamPublisher;
public DemoController(OrderPublisher publisher, OrderStreamPublisher orderStreamPublisher) {
this.publisher = publisher;
this.orderStreamPublisher = orderStreamPublisher;
}
@PostMapping("")
public void publishOrderEvent(@RequestBody Order order) throws JsonProcessingException {
publisher.publishEvent(order);
}
@PostMapping("/stream")
public void publishOrderStreamEvent(@RequestBody Order order) throws JsonProcessingException {
orderStreamPublisher.produce(order);
}
}
Output of invoking the below endpoint is as follows
https://localhost:8080/v1/orders/stream
{"id":"23",
"status": "Processed"
}
Here the first is the unique identifier logged by the publisher and second is the order id logged by the stream consumer.
We will also publish to the stream so created via the Redis-Cli
127.0.0.1:6379> XADD stream-order * order "{\"id\": \"123\", \"status\": \"Processing\"}"
"1723877635854-0"
Here we perform add operation to add an order and get the unique id in response generated by Redis(since we passed * in the XADD command) when message is successfully added. Id format is as follows so that we have monotonially increasing and unique identifier generated.
<millisecondsTime>-<sequenceNumber>
Here we see just the order id printed by the stream consumer.
Redis Stream Use case:
When we need durable message storage, the ability to replay messages, manage message acknowledgments, and process messages in a distributed manner. Ideal for scenarios requiring reliable delivery, consumer coordination, and historical data access.
Summary
Here we looked at pub/sub as a pattern and dived into redis implementation of pub/sub understanding configuration, features and use cases.
We looked at the use case for pub/sub with fire and forget as well we leveraging redis streams for at-least-once message delivery. We dived into the implementation for redis stream, concept of consumer groups, offsets and acknowledgement, looking at the use case for streams as well. Both the fire and forget and at-least-once delivery have their own pros and cons and are suitable for different types of scenarios that we deal with.
Engineering Leader | Agile Practioner | Ex UKG
7 个月Aneshka Goyal Good explanation of the pub-sub concept ??