Demystifying Redis Messaging
Redis is well-known as a Cache. But it is not just a Cache, it has developed over the years into a reliable Key-Value Database, which we will discuss at another time. Today, we will discuss how It turned itself into a persistent message queue like Apache Kafka/Pulsar
What is Redis?
Redis is an in-memory key-value datastore. You can use it as a global cache/HashMap in your project. Redis Cluster contains Nodes, each responsible for maintaining certain hash slots. Redis hashes your data based on the key and assigns it to the appropriate node in the cluster. All the data is stored in-memory to provide efficient and high read/write throughputs.
Redis DataStrucutures (List):
Redis also supported various data structures like List, Set, and Sorted set as values for specific keys. So you can implement a global data structure in your project using Redis. For example, you can use the list to add all login requests for a single user account.
However, people abused the list to use it as a messaging queue. We can add data to the left of the list read from the right and use it as a queue. However, it is the recipe for problems
Introducing Redis PubSub:
Let’s consider we are building the backend for a delivery system like Amazon. Once an order is placed we have to E-mail notification and Message-notification services to notify the user thanking for the purchase. There are certain qualities of these Consumer services
Consumers can subscribe to a channel. Every time an event gets published on the channel all the consumers are notified. So, it is a PUSH-based model. It has multiple issues
public long notifyOrderPubSub(Session session, String order){
Order orderPojo = Order.builder()
.order(order).userId(session.getUserId()).emailId(session.getEmail())
.build();
return jedis.publish(CHANNEL_NAME, gson.toJson(orderPojo));
}
static class MyJedisPubSub extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
Order order = gson.fromJson(message, Order.class);
System.out.println(Thread.currentThread().getName() + " received message from channel '" + channel + "': " + order.getOrder());
}
}
The PubSub Model de-couples the producers and consumers. So now we can add consumers based on the need to read the events. But still, I believe this model has quite a lot of issues and suits only a few use cases.
Redis Streams for Messaging:
Redis introduced Streams in 5.0, using a completely different data structure to support messaging. Each message in the stream is assigned a unique increasing timestamp and a sequence number to handle colliding timestamps.
Each range of timestamps is converted to an array and assigned to a Radix Tree node. There are 2 advantages to this
领英推荐
Single Subscription Consumer use case:
We can create consumers to subscribe to the stream and read the data in an infinite loop. However, users should take responsibility for paginating the data. Ie, Once the user consumed maybe 1000 events with starting time from Feb 7th, 10 AM. They should find out the last consumed timestamp and request data published to the channel from that time. However, Redis gives us a lot of options to query for data
public static void main(String[] args) {
RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisStreamCommands<String, String> streamCommands = connection.sync();
String lastSeenOffset = "0-0";
while(true){
List<StreamMessage<String, String>> streamMessages = streamCommands.xread(XReadArgs.Builder.block(500).count(1), XReadArgs.StreamOffset.from(STREAM_KEY, lastSeenOffset));
for(StreamMessage<String, String> message : streamMessages){
lastSeenOffset = message.getId();
System.out.print(lastSeenOffset+" received message");
System.out.print(" userId: " +message.getBody().get("userId"));
System.out.print(" order: "+message.getBody().get("order"));
System.out.println(" email: "+message.getBody().get("e-mail"));
}
}
}
Redis does not maintain an ACK messages list in a single consumer use case, as when the consumer dies Redis cannot recognize the new one anyway. It’s the user who sets the offset and reads from the Redis Server.
Consumer Groups:
Consumer groups are well-known in Messaging queues. So, for each topic, we can define a set of consumers and tag them with the same consumer group name. This will help us to process all the messages for the subscription in parallel.
Redis also provides the concept of the Consumer Group. It will internally use the RoundRobin strategy to allocate a set of messages to each consumer and maintain internal offset for the consumer group with Redis Server as a whole. So, it's easy to add and delete the consumers in the group.
while(true){
List<StreamMessage<String, String>> messages = new ArrayList();
List<StreamMessage<String, String>> newMessages = streamCommands.xreadgroup(Consumer.from(CONSUMER_GROUP, CONSUMER),
XReadArgs.Builder.count(3), XReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
ClaimedMessages<String, String> claimedMessages = streamCommands.xautoclaim(STREAM_KEY, XAutoClaimArgs.Builder.xautoclaim(Consumer.from(CONSUMER_GROUP, CONSUMER), (long)60000, "0-0").count(5));
messages.addAll(newMessages);
messages.addAll(claimedMessages.getMessages());
if(!messages.isEmpty()){
for(StreamMessage<String, String> message : messages){
System.out.print(lastSeenOffset+" received message");
System.out.print(" userId: " +message.getBody().get("userId"));
System.out.print(" order: "+message.getBody().get("order"));
System.out.println(" email: "+message.getBody().get("e-mail"));
streamCommands.xack(STREAM_KEY, CONSUMER_GROUP, message.getId());
}
}
}
Why Redis Stream might not be up to the mark of Kafka/Pulsar:
Kafka and Pulsar have the concept of partitioning along with consumer groups. It helps us to scale both the producers and consumers. Producers instead of overloading a single broker, send the data to a partition maintained by the broker. So, the load can be distributed to multiple brokers. Also along with the parallelism associated with Consumer Group, partitions allow us to maintain ordering in each partition. Redis does not support Partitions
2. Redis does not retry for the pending time-out messages. I believe this should be a basic feature for any messaging queue. Now every user has to explicitly read the pending messages and process them
I am surprised to see all these complex and sophisticated features in the well-known Key-Value data store that I used to think of as just a Cache. I do see companies using Redis in production as a message queue and the simplicity of using it compared to other messaging queues gives it a great advantage. However, I believe there is some work to be done. I cannot use it with full confidence in the production while working with billions of events per second. But it is well-suitable for mid-range scale applications.
References:
MS Computer Science Northeastern University | Class of 2025 | Full Stack Engineer 4+ Years | Distributed Cloud systems | Worked @Fidelity Labs
1 年Informative, thanks for sharing.