Introduction to Pub-Sub and Streams with Redis&SpringBoot

Introduction to Pub-Sub and Streams with Redis&SpringBoot

Publish/Subscribe

Problem: Let's say we have synchronous messaging between two components of our system called as service A(sender) and service B(receiver). Service A gets a request from Service B and sends a message to service B and is blocked to get acknowledgement thus cannot serve another receiver say Service C.

Solution: The Pub/Sub (Publisher/Subscriber) model is a messaging pattern used in software architecture to facilitate asynchronous communication between different components or systems. In this model, publishers produce messages that are then consumed by subscribers. The publishers publish messages without any knowledge of consumers(subscribers) and consumers consume these messages without being aware of who is the producer. The main idea behind this pattern is to achieve decoupled components in a system.

Since neither of the two main entities (publishers or subscribers) are aware of each other, message brokers play a crucial role in routing messages.

The key areas of Pub/Sub model includes -

  • Publishers: Entities that generate and send messages to a specific topic.
  • Subscribers: Entities that receive and consume messages by subscribing to a topic.
  • Topics: Channels or categories to which messages are published. Publishers send messages to specific topics, and subscribers can subscribe to one or more topics to receive messages of interest.
  • Message Brokers: Intermediaries that manage the routing of messages between publishers and subscribers. Message brokers receive messages in a topic and check for any registered subscribers for the topic, if there are any, message is sent to those, else dropped.
  • Message: A Message is the unit of data exchanged between publishers and subscribers in the Pub/Sub system. Messages can contain any type of data, such as text, JSON, or binary data.

A Subscription represents a connection between a subscriber and a topic. Subscriptions define which messages a subscriber will receive based on the topics to which it is subscribed. Subscriptions can have different configurations, such as message delivery guarantees (e.g., at-most-once, at-least-once) and acknowledgment mechanisms.


Some examples for this pattern implementers are Redis, RabbitMQ and Kafka (and there are many others).

Since in the title we are taking about implementing this pattern using Redis, Redis' Pub/Sub exhibits at-most-once message delivery semantics. As the name suggests, it means that a message will be delivered once if at all. Once the message is sent by the Redis server, there's no chance of it being sent again. If the subscriber is unable to handle the message (for example, due to an error or a network disconnect) the message is forever lost.

But Redis has streams to facilitate the at least once delivery. In this blog post we will explore both these flavours with Redis.

Redis Pub/Sub

Redis is the world’s fastest in-memory database. It provides cloud and on-prem solutions for caching, vector search, and NoSQL databases that seamlessly fit into any tech stack—making it simple for digital customers to build, scale, and deploy the fast apps our world runs on. Considering that Redis reads and writes data primarily on RAM, it naturally edges in speed when considered even for Pub/Sub.

Apart from being in memory database and leveraged as a cache, Redis also provides for capability of Pub/Sub pattern.

Since now we know about the publish/subscribe pattern, we also know about what is Redis, in order to see Redis pub/sub in action let's build a small application and drill into various key areas.

Hands On!

Redis Running Locally

First lets run a redis server locally for this we can install redis using brew, if running redis on windows either we can use docker image or follow the steps in here.

brew install redis        

Once we have redis installed and service running we would be able to Ping the redis server and obtain the response i.e Pong

SpringBoot Application Setup

Next step we would be setting up or initialising a spring boot application that lets us create publishers, topics and subscriptions to these topics by subscribers.

We will use spring initializr to intitialize an application with maven for dependency management and Java version as 17. The pom file for the application looks like below.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.3.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>redisDemo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>redisDemo</name>
	<description>Demo project for Spring Boot</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>jakarta.inject</groupId>
			<artifactId>jakarta.inject-api</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>
        

We would be leveraging spring-boot-starter-data-redis to work with redis and spring-boot-starter-web to expose some endpoints (just for demo purposes.)

Redis Configuration for Pub/Sub

First lets take a look at our application properties file,

spring.application.name=redisDemo
redis.data.host=127.0.0.1
redis.data.port=6379
redis.order.topic=order        

Here we have defined the port and host to connect to redis (running locally), the name of topic where publishers will publish messages and subscribers can subscribe to this topic in order to receive messages.

Below is the RedisConfiguration.java that has the configurations for Redis connection, publishing and subscriptions. Let's take a look at the need for each of the beans we specified in this file.

@Configuration
public class RedisConfiguration {

    @Value("${redis.order.topic}")
    private String topic;
    @Value("${redis.order.stream.key}")
    private String streamKey;
    @Value("${redis.data.host}")
    private String redisHost;
    @Value("${redis.data.port}")
    private int redisPort;

    @Bean
    public LettuceConnectionFactory redisPreferredConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration(redisHost, redisPort), LettuceClientConfiguration.builder().build());
    }

    @Bean
    StringRedisTemplate stringRedisTemplate(@Qualifier("redisPreferredConnectionFactory") RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic(topic);
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(ChannelTopic topic, @Qualifier("redisPreferredConnectionFactory") RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListener(), topic);
        container.addMessageListener(messageListenerPostProcessing(), topic);
        container.addMessageListener(messageListenerPostProcessing(), PatternTopic.of("consumer-*"));
        return container;
    }

    @Bean
    public MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new OrderEventListener(), "listenToOrderUpdate");
    }

    @Bean
    public MessageListenerAdapter messageListenerPostProcessing() {
        return new MessageListenerAdapter(new OrderPostProcessingEventListener(), "listenToOrderUpdate");
    }
}        

Here first we use the LettuceConnectionFactory to build a connection with local redis.

There is another option for ConnectionFactory i.e Jedis.

LettuceConnectionFactory in comparison to JedisConnectionFactory:

  • Generally considered to be more performant and scalable compared to Jedis, especially in multi-threaded environments.
  • Supports asynchronous and reactive programming models, which can improve performance in high-concurrency scenarios.
  • Provides built-in connection pooling, reducing the overhead of establishing and closing connections.

Next we define a StringRedisTemplate. RedisTemplate offers a high-level abstraction for Redis interactions. While RedisConnection offers low-level methods that accept and return binary values (byte arrays), the template takes care of serialization and connection management, freeing the user from dealing with such details. In Redis everything is stored in binary so we can define the serializer to use(RedisTemplate uses a Java-based serializer for most of its operations.) or as here we are leveraging StringRedisTemplate it leverages a StringRedisSerializer internally.

Next we define a channel topic, its type of a topic, topic can either be a channel topic or a pattern topic, here bean is created for a channel topic while we also define a pattern topic as below.

consumer-*        

Pattern topic allows subscriptions to topics that match a specific pattern.

Now we have configured the connection factory, leveraged it to provide for a RedisTemplate that producers can use to write messages to a topic. Lets see the configurations in place for a subscriber/subscription to work.

As for publishing for low level methods we have RedisConnection, similarly for subscriptions also RedisConnection provides for low level methods.

On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching.

At the low-level, RedisConnection offers the subscribe and pSubscribe methods that map the Redis commands for subscribing by channel or by pattern, respectively. Note that multiple channels or patterns can be used as arguments. To change the subscription of a connection or query whether it is listening, RedisConnection provides the methods getSubscription and isSubscribed methods.

Subscription commands in Spring Data Redis are blocking. That is, calling subscribe on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the subscription is canceled, which happens when another thread invokes unsubscribe or pUnsubscribe on the same connection. Message Listener Containers provide for a solution to this problem.

Message Listener Containers

Due to its blocking nature, low-level subscription(using RedisConnection) is not attractive, as it requires connection and thread management for every single listener. To alleviate this problem, Spring Data offers RedisMessageListenerContainer, which does all the heavy lifting. It is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).

RedisMessageListenerContainer acts as a message listener container. It is used to receive messages from a Redis channel and drive the MessageListener instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an Message Driven POJOs and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like.

Thus here we have created a RedisMessageListenerContainer with the connection factory and messageListenerAdaptors are injected to specify which topic a listener listens to and since its an adaptor, we can specify a different method name (following standard signature) to invoke when a message is received by the listener.

Redis Publisher

Let's now see the java code that publishes to redis topic(s).

@Component
public class OrderPublisher {

    @Value("${redis.order.topic}")
    private String topic;
    private final StringRedisTemplate redisTemplate;

    @Inject
    public OrderPublisher(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void publishEvent(Order order) throws JsonProcessingException {
        System.out.println(Thread.currentThread().getName());
        ObjectMapper objectMapper = new ObjectMapper();
        String msg = objectMapper.writeValueAsString(order);
        redisTemplate.convertAndSend(topic,msg);
        redisTemplate.convertAndSend("consumer-order", msg);
    }
}
        

Here we are sending the same message which is some order info to two topics, one named order and the other named as consumer-order.

We are using RedisTemplate(StringRedisTemplate we configured a step prior) convertAndSend method to send the message to a specific topic.

Let's take a look at the two subscribers we configured in the RedisConfiguration.

Redis Subscriber

public class OrderEventListener {

    public void listenToOrderUpdate(String message, String topic) throws IOException {
        System.out.println(Thread.currentThread().getName());
        ObjectMapper objectMapper = new ObjectMapper();
        Order order = objectMapper.readValue(message, Order.class);
        System.out.println("got oder with id " + order.getId());
    }
}        

Here we print the thread name followed by the message content that we received.

There is another listener that is subscribed to the same channel topic order and also subscribed via pattern matching to consumer-order

consumer-*        
public class OrderPostProcessingEventListener {

    public void listenToOrderUpdate(String message, String topic) throws IOException {
        System.out.println(Thread.currentThread().getName());
        ObjectMapper objectMapper = new ObjectMapper();
        Order order = objectMapper.readValue(message, Order.class);
        System.out.println("OrderPostProcessingEventListener|got oder with id " + order.getId());
    }
}        

Here also we print thread name and order message details.

We have configured the publisher, subscriber and topics. We have redis message broker running locally, now we need to define the trigger to be able to publish these messages.

In this blog post we would be publishing messages via a Rest endpoint and also via terminal client i.e redis-cli.

@RestController
@RequestMapping("/v1/orders")
public class DemoController {

    private final OrderPublisher publisher;

    public DemoController(OrderPublisher publisher) {
        this.publisher = publisher;
    }

    @PostMapping("")
    public void publishOrderEvent(@RequestBody Order order) throws JsonProcessingException {
        publisher.publishEvent(order);
    }
}        

Here we simply make a rest call which triggers the publisher to publish the order message to both the topics and thus we will have subscribers triggered once message is available.

Lets see the output when triggering the below API call.

https://localhost:8080/v1/orders

{"id":"23",
"status": "Processed"
}        

Here as we can see 4 different thread names as printed first one for publisher logs, rest three are listener container threads as total 3 times the subscribers are invoked, one subscriber invoked twice as it subscribed to both topics order and consumer-order

Output when message is published via Redis Cli

127.0.0.1:6379> PUBLISH order "{\"id\":\"06\",\"status\":\"Processed\"}"
(integer) 1
127.0.0.1:6379> 
        

Here we are publishing just one message to channel topic order via cli.

Since the message was published to just one topic, we invoked both the subscribers just once each and hence the the relevant output logs.

Redis pub/Sub key features:

Fire and Forget: Only delivers messages in real-time and does not retain any history. Once a message is published, it is not accessible to subscribers who missed it.

No Message acknowledgement: Does not support message acknowledgments. Every subscriber receives a copy of the message, but there's no mechanism for handling message processing or ensuring all subscribers receive and process each message correctly.

Redis pub/sub Use case:

When we require low-latency, real-time message distribution without the need for message retention or complex processing. Suitable for applications where instant delivery is more critical than message durability.

There comes up another use case where we want message history to be maintained with support for consumers to be able to consume messages when they are back online (basis on offset of last consumed message). It also shall help support replay of messages. This becomes an ideal scenario where message durability is critical, this leads us to redis streams.

Redis Streams

A Redis stream is a data structure that acts like an append-only log but also implements several operations to overcome some of the limits of a typical append-only log. These include random access in O(1) time and complex consumption strategies, such as consumer groups.

What is a Consumer Group?

A consumer group is a group of consumers that collectively read messages from the same stream. Each message in the stream is delivered to only one consumer within the group, allowing the workload to be distributed among multiple consumers.

Message Distribution:

Load Balancing: Within a consumer group, messages are distributed among consumers in a load-balanced manner. Each message is delivered to exactly one consumer in the group. This enables horizontal scaling, as we can add more consumers to handle a larger volume of messages.

Message Delivery Guarantees: Each message is processed by exactly one consumer, reducing the risk of duplicate processing and ensuring that each message is handled only once by the group.

Going on similar lines, lets now enhance the application we have so far to be able to publish to redis stream and subscribe to the stream for messages.

Redis Configuration for Streams

First we add the stream name to our application.properties file.

redis.order.stream.key=stream-order        

Next let's enhance the redis configurations for stream subscription by creating a consumer group and attaching a consumer to it.

//-----------Redis Stream Configuration----------------------------------

    @Bean
    public Subscription subscription(StringRedisTemplate stringRedisTemplate) throws UnknownHostException {

        createConsumerGroupIfNotExists(stringRedisTemplate, streamKey, streamKey);

        StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
                ObjectRecord<String, String>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofMillis(100))
                .targetType(String.class)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, String>>  container =
                StreamMessageListenerContainer
                        .create(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()), options);

        Subscription subscription =
                container.receive(Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                        streamOffset, orderStreamListener(stringRedisTemplate));

        container.start();
        return subscription;
    }

    @Bean
    public StreamListener<String, ObjectRecord<String, String>> orderStreamListener(StringRedisTemplate stringRedisTemplate) {
        // handle message from stream
        return new OrderStreamListener(stringRedisTemplate);
    }


    private void createConsumerGroupIfNotExists(StringRedisTemplate stringRedisTemplate,
                                                String streamKey, String groupName){

        try {
            try {
                stringRedisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0-0"),streamKey);
            } catch (RedisSystemException exception) {
                System.out.println(exception.getCause().getMessage());
            }
        }
        catch (RedisSystemException ex){
            System.out.println(ex.getMessage());
        }
    }
        

We do not need any explicit publisher configuration as we will leverage the StringRedisTemplate, here as well we have either the low-level RedisConnection or the high-level StreamOperations (which we will see while we take a look at Redis Stream Publisher)

For consumer we configured the options to poll the stream every 100 milliseconds. Here also we are not leveraging low level RedisConnection but StreamListenerContainer for high level abstraction and management of boiler plate code so that our consumer just focuses on business logic.

Note: Consumer group is created just once (when the stream is inialized for the first time).

Redis Stream Publisher

@Component
public class OrderStreamPublisher {

    @Value("${redis.order.stream.key}")
    private String key;
    private final StringRedisTemplate redisTemplate;

    @Inject
    public OrderStreamPublisher(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    public void produce(Order order) throws JsonProcessingException {

        ObjectMapper objectMapper = new ObjectMapper();
        String msg = objectMapper.writeValueAsString(order);

        ObjectRecord<String, String> record = StreamRecords.newRecord()
                .ofObject(msg)
                .withStreamKey(key);

        RecordId recordId = this.redisTemplate.opsForStream()
                .add(record);
        System.out.println(recordId);
    }
}        

The publisher leverages high level Redistemplate and stream records instead of low level RedisConnection to create an Object record and then add it to the stream.

Once the record is added to the stream successfully we get a unique identifier which is printed in the last statement.

Redis Stream Listener

public class OrderStreamListener implements StreamListener<String, ObjectRecord<String, String>> {

    @Value("${redis.order.stream.key")
    private String key;
    private final StringRedisTemplate redisTemplate;

    public OrderStreamListener(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void onMessage(ObjectRecord<String, String> message) {


        String orderString = message.getValue();
        ObjectMapper objectMapper = new ObjectMapper();
        Order order = null;
        try {
            order = objectMapper.readValue(orderString, Order.class);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        System.out.println(order.getId());
        redisTemplate.opsForStream().acknowledge(key, message);
    }
}        

This is the listener that is invoked when the listnerContainer gets a message from the stream. StreamListener's onMessage method is invoked specifically that holds the business logic, here to just log the message details.

Point to note: The last line is where the consumer acknowledges processing the message so that the offset can be advanced and thus this is how at-least-once message delivery is guaranteed. If a message is not acknowledged by the listener then it remains available for consumption by some other consumer in the same consumer group.

Next let's see things in action, just like with pub/sub here as well we will trigger an API call to trigger message addition to the stream.

We will enhance the controller as below, adding stream publisher and relevant endpoint for the same.

@RestController
@RequestMapping("/v1/orders")
public class DemoController {

    private final OrderPublisher publisher;
    private final OrderStreamPublisher orderStreamPublisher;

    public DemoController(OrderPublisher publisher, OrderStreamPublisher orderStreamPublisher) {
        this.publisher = publisher;
        this.orderStreamPublisher = orderStreamPublisher;
    }

    @PostMapping("")
    public void publishOrderEvent(@RequestBody Order order) throws JsonProcessingException {
        publisher.publishEvent(order);
    }

    @PostMapping("/stream")
    public void publishOrderStreamEvent(@RequestBody Order order) throws JsonProcessingException {
        orderStreamPublisher.produce(order);
    }
}        

Output of invoking the below endpoint is as follows

https://localhost:8080/v1/orders/stream

{"id":"23",
"status": "Processed"
}        

Here the first is the unique identifier logged by the publisher and second is the order id logged by the stream consumer.

We will also publish to the stream so created via the Redis-Cli

127.0.0.1:6379> XADD stream-order * order "{\"id\": \"123\", \"status\": \"Processing\"}"
"1723877635854-0"        

Here we perform add operation to add an order and get the unique id in response generated by Redis(since we passed * in the XADD command) when message is successfully added. Id format is as follows so that we have monotonially increasing and unique identifier generated.

<millisecondsTime>-<sequenceNumber>        

Here we see just the order id printed by the stream consumer.

Redis Stream Use case:

When we need durable message storage, the ability to replay messages, manage message acknowledgments, and process messages in a distributed manner. Ideal for scenarios requiring reliable delivery, consumer coordination, and historical data access.

Summary

Here we looked at pub/sub as a pattern and dived into redis implementation of pub/sub understanding configuration, features and use cases.

We looked at the use case for pub/sub with fire and forget as well we leveraging redis streams for at-least-once message delivery. We dived into the implementation for redis stream, concept of consumer groups, offsets and acknowledgement, looking at the use case for streams as well. Both the fire and forget and at-least-once delivery have their own pros and cons and are suitable for different types of scenarios that we deal with.

Sources of knowledge

https://www.baeldung.com/spring-data-redis-pub-sub

https://redis.io/docs/latest/develop/interact/pubsub/

https://docs.spring.io/spring-data/redis/reference/redis/redis-streams.html

https://docs.spring.io/spring-data/redis/reference/redis/pubsub.html

https://redis.io/docs/latest/develop/data-types/streams/

Kushagra Bindal

Engineering Leader | Agile Practioner | Ex UKG

7 个月

Aneshka Goyal Good explanation of the pub-sub concept ??

要查看或添加评论,请登录

Aneshka Goyal的更多文章

  • Introduction to Distributed Tracing

    Introduction to Distributed Tracing

    What is Distributed Tracing? The word tracing is to trace the request as it flows through the system. Since modern…

    2 条评论
  • Introduction to Service Discovery

    Introduction to Service Discovery

    What is Service Discovery? Service Discovery as the name suggests allows us to know or discover where each instance of…

  • Introduction to Micro frontend

    Introduction to Micro frontend

    What is Micro frontend? The term “micro frontends” debuted in the 2016 ThoughtWorks Technology Radar guide. At its…

  • Introduction to Time Series Database - InfuxDB

    Introduction to Time Series Database - InfuxDB

    What is Time Series Data? As the title of the blog depicts we would be discussing about time series databases and in…

    1 条评论
  • Introduction to Ontology

    Introduction to Ontology

    What is Ontology? An ontology is a formal and structural description of knowledge about a specific domain. Knowledge is…

  • From Java 17 to Java 21 - Features and Benefits

    From Java 17 to Java 21 - Features and Benefits

    Java has been constantly evolving with new features and enhancements. With the recent LTS (Long term support) version…

    2 条评论
  • Vault Authentication and Springboot integration

    Vault Authentication and Springboot integration

    What is Vault? Vault is an identity-based secrets and encryption management system. A secret is anything that we want…

  • Introduction to gRPC with Spring boot

    Introduction to gRPC with Spring boot

    Overview RPC stands for remote procedure calls. In this the client is able to directly invoke a method on server…

    6 条评论
  • Introduction to Triple Crown

    Introduction to Triple Crown

    Organizations are always trying to improve how they work, in order to increase efficiency and reduce errors. This…

    4 条评论
  • From Java 8 to Java 17- Features and Benefits

    From Java 8 to Java 17- Features and Benefits

    Java language has undergone several changes and modifications since Java 1.0.

    1 条评论

社区洞察

其他会员也浏览了