Demystifying Redis Messaging

Demystifying Redis Messaging

Redis is well-known as a Cache. But it is not just a Cache, it has developed over the years into a reliable Key-Value Database, which we will discuss at another time. Today, we will discuss how It turned itself into a persistent message queue like Apache Kafka/Pulsar

What is Redis?

Redis is an in-memory key-value datastore. You can use it as a global cache/HashMap in your project. Redis Cluster contains Nodes, each responsible for maintaining certain hash slots. Redis hashes your data based on the key and assigns it to the appropriate node in the cluster. All the data is stored in-memory to provide efficient and high read/write throughputs.

Redis Hash Slots

Redis DataStrucutures (List):

Redis also supported various data structures like List, Set, and Sorted set as values for specific keys. So you can implement a global data structure in your project using Redis. For example, you can use the list to add all login requests for a single user account.

However, people abused the list to use it as a messaging queue. We can add data to the left of the list read from the right and use it as a queue. However, it is the recipe for problems

  1. What if different consumers want to read from the same queue? We have to store the history.
  2. Even if we store history, users have to maintain local headers and seek the index before reading
  3. How can a single hash slot store loads of backlog data?

Redis List as Queue

Introducing Redis PubSub:

Let’s consider we are building the backend for a delivery system like Amazon. Once an order is placed we have to E-mail notification and Message-notification services to notify the user thanking for the purchase. There are certain qualities of these Consumer services

  1. Consumer services want to read the event as immediately as possible
  2. They do not mind missing an event

Consumers can subscribe to a channel. Every time an event gets published on the channel all the consumers are notified. So, it is a PUSH-based model. It has multiple issues

  1. The time for PUBLISH is O(Subscribers) this is because Redis pushes the event to all the subscribers in real-time.
  2. What if the consumer is unavailable for a few seconds? It will miss the event.
  3. What if consumers could not keep up with the messages pushed? The buffer will overflow.

public long notifyOrderPubSub(Session session, String order){
    Order orderPojo = Order.builder()
                        .order(order).userId(session.getUserId()).emailId(session.getEmail())
                                    .build();
    return jedis.publish(CHANNEL_NAME, gson.toJson(orderPojo));
}        

Redis node pushing data to all consumers
static class MyJedisPubSub extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            Order order = gson.fromJson(message, Order.class);
            System.out.println(Thread.currentThread().getName() + " received message from channel '" + channel + "': " + order.getOrder());
        }
}        

The PubSub Model de-couples the producers and consumers. So now we can add consumers based on the need to read the events. But still, I believe this model has quite a lot of issues and suits only a few use cases.

Redis Streams for Messaging:

Redis introduced Streams in 5.0, using a completely different data structure to support messaging. Each message in the stream is assigned a unique increasing timestamp and a sequence number to handle colliding timestamps.

Increasing EventId

Each range of timestamps is converted to an array and assigned to a Radix Tree node. There are 2 advantages to this

  1. As all of these streams use the same keys, the root node stores the keys and the node array just stores the values which saves a lot of memory
  2. The most common way of accessing data in an event queue is based on time. So, the <MIN, MAX> range attached to the root node can be used to skip scanning every event

Single Subscription Consumer use case:

We can create consumers to subscribe to the stream and read the data in an infinite loop. However, users should take responsibility for paginating the data. Ie, Once the user consumed maybe 1000 events with starting time from Feb 7th, 10 AM. They should find out the last consumed timestamp and request data published to the channel from that time. However, Redis gives us a lot of options to query for data

  1. Read from a particular timestamp
  2. Blocking read to wait for t ms at the Redis server end and send all the messages as a batch
  3. Blocking read to wait for batching n messages from the Redis server end and then send as a batch

public static void main(String[] args) {
    RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisStreamCommands<String, String> streamCommands = connection.sync();
    String lastSeenOffset = "0-0";

    while(true){
        List<StreamMessage<String, String>> streamMessages = streamCommands.xread(XReadArgs.Builder.block(500).count(1), XReadArgs.StreamOffset.from(STREAM_KEY, lastSeenOffset));
        for(StreamMessage<String, String> message : streamMessages){
            lastSeenOffset = message.getId();
            System.out.print(lastSeenOffset+" received message");
            System.out.print(" userId: " +message.getBody().get("userId"));
            System.out.print(" order: "+message.getBody().get("order"));
            System.out.println(" email: "+message.getBody().get("e-mail"));
        }
    }
}        

Redis does not maintain an ACK messages list in a single consumer use case, as when the consumer dies Redis cannot recognize the new one anyway. It’s the user who sets the offset and reads from the Redis Server.

Consumer Groups:

Consumer groups are well-known in Messaging queues. So, for each topic, we can define a set of consumers and tag them with the same consumer group name. This will help us to process all the messages for the subscription in parallel.

Redis also provides the concept of the Consumer Group. It will internally use the RoundRobin strategy to allocate a set of messages to each consumer and maintain internal offset for the consumer group with Redis Server as a whole. So, it's easy to add and delete the consumers in the group.

  1. However, once a set of messages is sent to consumer1 and it crashes there is no feature to timeout and add to the other consumers
  2. So, each consumer has to explicitly read a few pending messages using xClaim and process them.

while(true){
    List<StreamMessage<String, String>> messages = new ArrayList();
    List<StreamMessage<String, String>> newMessages = streamCommands.xreadgroup(Consumer.from(CONSUMER_GROUP, CONSUMER),
            XReadArgs.Builder.count(3), XReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
    ClaimedMessages<String, String> claimedMessages = streamCommands.xautoclaim(STREAM_KEY, XAutoClaimArgs.Builder.xautoclaim(Consumer.from(CONSUMER_GROUP, CONSUMER), (long)60000, "0-0").count(5));
    messages.addAll(newMessages);
    messages.addAll(claimedMessages.getMessages());
    if(!messages.isEmpty()){
        for(StreamMessage<String, String> message : messages){
            System.out.print(lastSeenOffset+" received message");
            System.out.print(" userId: " +message.getBody().get("userId"));
            System.out.print(" order: "+message.getBody().get("order"));
            System.out.println(" email: "+message.getBody().get("e-mail"));
            streamCommands.xack(STREAM_KEY, CONSUMER_GROUP,  message.getId());
        }
    }
}        

Why Redis Stream might not be up to the mark of Kafka/Pulsar:

Kafka and Pulsar have the concept of partitioning along with consumer groups. It helps us to scale both the producers and consumers. Producers instead of overloading a single broker, send the data to a partition maintained by the broker. So, the load can be distributed to multiple brokers. Also along with the parallelism associated with Consumer Group, partitions allow us to maintain ordering in each partition. Redis does not support Partitions

2. Redis does not retry for the pending time-out messages. I believe this should be a basic feature for any messaging queue. Now every user has to explicitly read the pending messages and process them


I am surprised to see all these complex and sophisticated features in the well-known Key-Value data store that I used to think of as just a Cache. I do see companies using Redis in production as a message queue and the simplicity of using it compared to other messaging queues gives it a great advantage. However, I believe there is some work to be done. I cannot use it with full confidence in the production while working with billions of events per second. But it is well-suitable for mid-range scale applications.

References:

  1. https://www.instaclustr.com/blog/redis-streams-vs-apache-kafka/
  2. https://www.youtube.com/watch?v=Ty1rQuRJijk
  3. https://jeffdevslife.com/p/redis-list/

Venkata LeelaKrishna Kodipunjula

MS Computer Science Northeastern University | Class of 2025 | Full Stack Engineer 4+ Years | Distributed Cloud systems | Worked @Fidelity Labs

1 年

Informative, thanks for sharing.

要查看或添加评论,请登录

Tarun Annapareddy的更多文章

社区洞察

其他会员也浏览了