A Gentle Intro for RabbitMQ
Image credit: rabbitmq.com

A Gentle Intro for RabbitMQ

In this article, we are going to talk about message queues. What message queues are, when they are used and then we will explore RabbitMQ and its components as a practical example.

Message queues represent an extra layer that saves requests to be processed at some point in the future. This layer may exist for a variety of reasons maybe the requests are resource-intensive and we don't want our servers suffocating because of them, maybe we need to process a batch of similar requests together or even some application-specific reason that the business model requires. They are simply queues that handles requests as their elements.

Another great advantage of this architecture is that it reduces coupling between different services. In a micro-services architecture, this also acts as a hub for connecting all the services together instead of a mesh where each service needs to be directly connected to all other services that it needs to communicate with.


RabbitMQ

RabbitMQ official site defines its product as a message broker. In our context, the broker is the system responsible for taking a message from the sender and making sure the receiver gets it. This actually implies that it can be used for more than just queueing jobs. In addition to being used as a Jobs(processes) queue. RabbitMQ can also be used as:

  • publisher/subscriber system
  • RPC(remote procedure call) system

RabbitMQ is built on AMQP(Advanced Message Queuing Protocol). In order to understand RabbitMQ, we need to understand AMQP and its components in the next section.


AMQP

It's a TCP-based protocol which is responsible for the communication between the sender and the broker, the receiver and the broker.

AMQP architecture introduces some abstractions that are a little bit more complex than just a sender and a receiver. The sender is referred to as a publisher and the receiver as the consumer. Between the publisher and the receiver lies -the post office- our broker server(RabbitMQ).

Also for the communication between publisher and broker, consumer and broker. AMQP introduces channels which is mainly the communication gates of the broker. When a server(publisher/consumer) wants to connect to the broker, it opens an AMQP connection. If the same server wants to make another connection, it doesn't have to. It can open another channel within the same connection. This is mainly an optimization because TCP connections are resource consuming.

No alt text provided for this image

Inside the broker server, there are 3 main abstractions:

  • Exchanges: This is what receives messages from the publisher (This is the department inside the post office(broker) where you put a letter to be sent).
  • Queues: This is where you get your messages as a consumer (This is the mailbox in front of your house).
  • Bindings: These are rules(contracts) of communication between Exchanges and Queues. Each queue defines them to tell the broker which messages it wants from its connected exchange.

AMQP is programmable, which means it allows the user to declare its entities, such as: exchanges, queues and define the bindings between them. You can read more about it here.


Exchange Types

When an exchange receives a message, it routes the message to one/more queues. There are some factors that governs the routing. The first factor is the type of the exchange itself. The type also affects how RabbitMQ behaves. There are 4 types of exchanges:

  • Fanout exchange: This exchange type just broadcasts all the messages it receives to all the queues it knows about. This is perfect for a publisher/subscriber system, like a global notifier in multi-player games. A complete example of a pub/sub system can be found here.
  • Direct exchange: This exchange type filters messages to certain queues according to the value of routingKey parameter which is declared during the binding between the exchange and the queue.
  • Topic exchange: This exchange provide more flexibility to filtering messages according to the value of the routingKey parameter. The value must be a list of words delimited with dots and it can have one of 2 special characters: '*' which can substitute for exactly one word or '#' which can substitute for zero or more words.
  • Headers exchange: This exchange routes messages on multiple attributes. It ignores the value of routingKey instead it routes using the message headers attribute. The headers attribute can be an integer or a hash.


A Taste of RabbitMQ

In this section, we will use RabbitMQ as an RPC system. This example is fully described here in the official docs. I tweaked it a little bit.

An RPC(remote procedure call) just means that we want to execute a function on some remote server. If we are going to execute a function then most likely we will want to receive the results or at least be notified that the function got executed. Now you can see that RabbitMQ isn't behaving as just a message queue but also an interface of communication between 2 servers -more specifically services.

In a micro-services architecture, this can be a great way of connecting your services. Your RPC client sends a message along with an address of a queue called the callback queue that will receive the response of the RPC server and send it back to the RPC client. sounds great, right?

Not exactly! Clearly we don't want to create a new queue every time a service want to communicate with another. Why not use the same queue many times? Now we need a way to identify which response message belongs to which request.

For this, we can simply set an attribute with the message whose value is unique per request, this is the CorrelationId attribute.

So A bird-eye view of the process will be something like this:

  1. There will be a direct exchange called "RPC" handling all RPC requests.
  2. When any Client starts up, it creates 2 anonymous exclusive callback queues. One for receiving RPC requests from other services (This will have the service name on it) and the other one to receive the response messages of its RPC requests to other services.
  3. To make an RPC request, the Client sends a message to the "RPC" exchange and to the queue named upon the service it is requesting. The message has two properties: ReplyTo, which is set to the callback queue and CorrelationId, which is set to a unique value for every request.
  4. The request is sent to the queue whose name is the requested service name.
  5. The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the ReplyTo property.
  6. The client waits for data on the callback queue. When a message appears, it checks the CorrelationId property. If it matches the value from the request it returns the response to the application.

The following image explains the whole process. "rpc_queue" will be replaced with the "SERVICE_NAME" queue which is bound to the exchange "RPC" with a routingKey with the same name as "SERVICE_NAME".

No alt text provided for this image

An example of an RPC request will be similar to this:

channel.BasicPublish(
            exchange: "RPC",             // The global exchange for RPC  
            routingKey: "SERVICE_NAME",  // The service we are requesting
            basicProperties: props,      // holds ReplyTo and CorrelationId
            body: "MESSAGE_BYTES");      /* parameters to the RPC function
                                            here we are sending a number 
                                            to get its fiboinacci value */

Now the fun part, the code. I used the C# .NET example code but feel free to check the site for your language of choice. I commented almost every line of the code below. Note: I didn't repeat most of the comments in the client code because it is the same as the server.

The RPC server code:

using System;
using RabbitMQ.Client;          // The nuget package
using RabbitMQ.Client.Events;   /* The events handler to know when we    
                                   get a new message (RPC request) */
using System.Text;

class RPCServer
{
    public static void Main()
    {
        /* Create a connection factory to a RabbitMQ server that resides on
         the same machine (Not sure but this is factory design pattern) */
        var factory = new ConnectionFactory() { HostName = "localhost" };

        // Open a connection and create a channel for the communication
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            // Declare a queue to receive requests 
            channel.QueueDeclare(queue: "SERVICE_NAME", durable: false,
              exclusive: false, autoDelete: false, arguments: null);

            // This makes the queue fairly distribute jobs (later)
            channel.BasicQos(0, 1, false);

            // Create an event listener to fire when we receive a request
            var consumer = new EventingBasicConsumer(channel);

            // Configure the event listener
            channel.BasicConsume(queue: "SERVICE_NAME",
              autoAck: false, consumer: consumer);

            // Console feedback
            Console.WriteLine(" [x] Awaiting RPC requests");

            // Callback to be called when we recieve an RPC request
            consumer.Received += (model, ea) =>
            {
                string response = null;

                // Extract message body
                var body = ea.Body.ToArray();
                
                // Extract ReplyTo and CorrelationID properties
                var props = ea.BasicProperties;

                // Create a properties instance for the reply
                var replyProps = channel.CreateBasicProperties();

                // Set the CorrelationId of the Reply
                replyProps.CorrelationId = props.CorrelationId;


                try
                {
                    // Decode the body from binart to string
                    var message = Encoding.UTF8.GetString(body);

                    // Convert string into integer
                    int n = int.Parse(message);

                    // Console feedback
                    Console.WriteLine(" [.] fib({0})", message);

                    // CALL THE RPC FUNCTION WITH THE REQUESTED VALUE
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    // Catch in case of failure 
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    // Encode response into the binary form
                    var responseBytes = Encoding.UTF8.GetBytes(response);

                    // Publish the RPC response 
                    channel.BasicPublish(exchange: "RPC", 
                                         routingKey: props.ReplyTo,                                  
                                         basicProperties: replyProps, 
                                         body: responseBytes);
                  
                    /* Manual acknowledgement that we received the message  
                       and processed it successfully (later)*/ 
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            // Console feedback
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }


    // This is the fiboinacci implementation
    // Assumes only valid positive integer input.
    // Don't expect this one to work for big numbers, and it's
    // probably the slowest recursive implementation possible.
    // 

    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

The RPC client code:

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

/*
  To illustrate how an RPC service could be used we're going to create a  
  simple client class. It's going to expose a method named Call which sends
  an RPC request and blocks until the answer is received.
  It will be named RpcClient.
*/
public class RpcClient
{
    /*
       Any variable type whose name begins with a capital "I"
       is an interface.
       For more info about this, read about Dependency injection design
       pattern
    */
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

    public RpcClient()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);

            // Making sure the CorrelationId matches ours
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "RPC",
            routingKey: "SERVICE_NAME",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take();
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
    }
}


Reliability

  • Durability: Exchanges can survive the broker restart, we need to mark the queue and the messages as durable. This simply tells RabbitMQ to save the message to disk instead of RAM.
// We declare the queue as durable
channel.QueueDeclare(queue: "QUEUE_NAME", durable: true);

// we authenicate the channel that messages are presistent
channel.CreateBasicProperties().Persistent = true;
  • Message acknowledgements: After the task is processed, we send the ack. This way we are sure that even if a worker dies before fully processing a task, we won't lose it as RabbitMQ will re-queue it.
// We manually send the ack. after we process the task in the callback
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);


// We turn off auto acknowledgment because we are manually doing it
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

  • Fair Dispatch: It guarantees that the jobs are fairly distributed between consumers(if there are more than one) by not pushing tasks directly instead after a consumer finishes the previous one.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


RabbitMQ Clients

After you install RabbitMQ either locally or using a docker container. You can use your favourite language of choice client to interact with it in your application. There are clients for different languages. You can find more about them here.

Also the official docs provide an excellent get started tutorials in many languages.


Thanks for reading!

要查看或添加评论,请登录

Abdullah Abunar的更多文章

  • A Gentle Intro for Elastic Search

    A Gentle Intro for Elastic Search

    In this article, We will talk about elastic search. What exactly it is, its inner structure, what are some use cases…

  • Testing from a developer’s perspective

    Testing from a developer’s perspective

    When anyone starts development -any platform, rarely does he run into the term testing at the start, what exactly it…

  • An ASP.NET Core guide into the Basics

    An ASP.NET Core guide into the Basics

    Note: Although I worked hard to write and review this series of articles, the fact that I am still novice to the…

  • A Gentle Intro for Redis

    A Gentle Intro for Redis

    The Redis website defines its product to be an in-memory data structure store which support strings, hashes, sets and…

  • GSoC'19 Final Report

    GSoC'19 Final Report

    This is my final report for GSoC'19. After about 3 months of working, The project is finally done.

  • Enhancing sub-circuit implementation for CircuitVerse.org

    Enhancing sub-circuit implementation for CircuitVerse.org

    This blog will summarize my work with CircuitVerse as part of my GSoC. First of all, I would like to thank my mentor Mr.

    1 条评论

社区洞察

其他会员也浏览了