A Gentle Intro for RabbitMQ
In this article, we are going to talk about message queues. What message queues are, when they are used and then we will explore RabbitMQ and its components as a practical example.
Message queues represent an extra layer that saves requests to be processed at some point in the future. This layer may exist for a variety of reasons maybe the requests are resource-intensive and we don't want our servers suffocating because of them, maybe we need to process a batch of similar requests together or even some application-specific reason that the business model requires. They are simply queues that handles requests as their elements.
Another great advantage of this architecture is that it reduces coupling between different services. In a micro-services architecture, this also acts as a hub for connecting all the services together instead of a mesh where each service needs to be directly connected to all other services that it needs to communicate with.
RabbitMQ
RabbitMQ official site defines its product as a message broker. In our context, the broker is the system responsible for taking a message from the sender and making sure the receiver gets it. This actually implies that it can be used for more than just queueing jobs. In addition to being used as a Jobs(processes) queue. RabbitMQ can also be used as:
- publisher/subscriber system
- RPC(remote procedure call) system
RabbitMQ is built on AMQP(Advanced Message Queuing Protocol). In order to understand RabbitMQ, we need to understand AMQP and its components in the next section.
AMQP
It's a TCP-based protocol which is responsible for the communication between the sender and the broker, the receiver and the broker.
AMQP architecture introduces some abstractions that are a little bit more complex than just a sender and a receiver. The sender is referred to as a publisher and the receiver as the consumer. Between the publisher and the receiver lies -the post office- our broker server(RabbitMQ).
Also for the communication between publisher and broker, consumer and broker. AMQP introduces channels which is mainly the communication gates of the broker. When a server(publisher/consumer) wants to connect to the broker, it opens an AMQP connection. If the same server wants to make another connection, it doesn't have to. It can open another channel within the same connection. This is mainly an optimization because TCP connections are resource consuming.
Inside the broker server, there are 3 main abstractions:
- Exchanges: This is what receives messages from the publisher (This is the department inside the post office(broker) where you put a letter to be sent).
- Queues: This is where you get your messages as a consumer (This is the mailbox in front of your house).
- Bindings: These are rules(contracts) of communication between Exchanges and Queues. Each queue defines them to tell the broker which messages it wants from its connected exchange.
AMQP is programmable, which means it allows the user to declare its entities, such as: exchanges, queues and define the bindings between them. You can read more about it here.
Exchange Types
When an exchange receives a message, it routes the message to one/more queues. There are some factors that governs the routing. The first factor is the type of the exchange itself. The type also affects how RabbitMQ behaves. There are 4 types of exchanges:
- Fanout exchange: This exchange type just broadcasts all the messages it receives to all the queues it knows about. This is perfect for a publisher/subscriber system, like a global notifier in multi-player games. A complete example of a pub/sub system can be found here.
- Direct exchange: This exchange type filters messages to certain queues according to the value of routingKey parameter which is declared during the binding between the exchange and the queue.
- Topic exchange: This exchange provide more flexibility to filtering messages according to the value of the routingKey parameter. The value must be a list of words delimited with dots and it can have one of 2 special characters: '*' which can substitute for exactly one word or '#' which can substitute for zero or more words.
- Headers exchange: This exchange routes messages on multiple attributes. It ignores the value of routingKey instead it routes using the message headers attribute. The headers attribute can be an integer or a hash.
A Taste of RabbitMQ
In this section, we will use RabbitMQ as an RPC system. This example is fully described here in the official docs. I tweaked it a little bit.
An RPC(remote procedure call) just means that we want to execute a function on some remote server. If we are going to execute a function then most likely we will want to receive the results or at least be notified that the function got executed. Now you can see that RabbitMQ isn't behaving as just a message queue but also an interface of communication between 2 servers -more specifically services.
In a micro-services architecture, this can be a great way of connecting your services. Your RPC client sends a message along with an address of a queue called the callback queue that will receive the response of the RPC server and send it back to the RPC client. sounds great, right?
Not exactly! Clearly we don't want to create a new queue every time a service want to communicate with another. Why not use the same queue many times? Now we need a way to identify which response message belongs to which request.
For this, we can simply set an attribute with the message whose value is unique per request, this is the CorrelationId attribute.
So A bird-eye view of the process will be something like this:
- There will be a direct exchange called "RPC" handling all RPC requests.
- When any Client starts up, it creates 2 anonymous exclusive callback queues. One for receiving RPC requests from other services (This will have the service name on it) and the other one to receive the response messages of its RPC requests to other services.
- To make an RPC request, the Client sends a message to the "RPC" exchange and to the queue named upon the service it is requesting. The message has two properties: ReplyTo, which is set to the callback queue and CorrelationId, which is set to a unique value for every request.
- The request is sent to the queue whose name is the requested service name.
- The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the ReplyTo property.
- The client waits for data on the callback queue. When a message appears, it checks the CorrelationId property. If it matches the value from the request it returns the response to the application.
The following image explains the whole process. "rpc_queue" will be replaced with the "SERVICE_NAME" queue which is bound to the exchange "RPC" with a routingKey with the same name as "SERVICE_NAME".
An example of an RPC request will be similar to this:
channel.BasicPublish( exchange: "RPC", // The global exchange for RPC routingKey: "SERVICE_NAME", // The service we are requesting basicProperties: props, // holds ReplyTo and CorrelationId body: "MESSAGE_BYTES"); /* parameters to the RPC function here we are sending a number to get its fiboinacci value */
Now the fun part, the code. I used the C# .NET example code but feel free to check the site for your language of choice. I commented almost every line of the code below. Note: I didn't repeat most of the comments in the client code because it is the same as the server.
The RPC server code:
using System; using RabbitMQ.Client; // The nuget package using RabbitMQ.Client.Events; /* The events handler to know when we get a new message (RPC request) */ using System.Text; class RPCServer { public static void Main() { /* Create a connection factory to a RabbitMQ server that resides on the same machine (Not sure but this is factory design pattern) */ var factory = new ConnectionFactory() { HostName = "localhost" }; // Open a connection and create a channel for the communication using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // Declare a queue to receive requests channel.QueueDeclare(queue: "SERVICE_NAME", durable: false, exclusive: false, autoDelete: false, arguments: null); // This makes the queue fairly distribute jobs (later) channel.BasicQos(0, 1, false); // Create an event listener to fire when we receive a request var consumer = new EventingBasicConsumer(channel); // Configure the event listener channel.BasicConsume(queue: "SERVICE_NAME", autoAck: false, consumer: consumer); // Console feedback Console.WriteLine(" [x] Awaiting RPC requests"); // Callback to be called when we recieve an RPC request consumer.Received += (model, ea) => { string response = null; // Extract message body var body = ea.Body.ToArray(); // Extract ReplyTo and CorrelationID properties var props = ea.BasicProperties; // Create a properties instance for the reply var replyProps = channel.CreateBasicProperties(); // Set the CorrelationId of the Reply replyProps.CorrelationId = props.CorrelationId; try { // Decode the body from binart to string var message = Encoding.UTF8.GetString(body); // Convert string into integer int n = int.Parse(message); // Console feedback Console.WriteLine(" [.] fib({0})", message); // CALL THE RPC FUNCTION WITH THE REQUESTED VALUE response = fib(n).ToString(); } catch (Exception e) { // Catch in case of failure Console.WriteLine(" [.] " + e.Message); response = ""; } finally { // Encode response into the binary form var responseBytes = Encoding.UTF8.GetBytes(response); // Publish the RPC response channel.BasicPublish(exchange: "RPC", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); /* Manual acknowledgement that we received the message and processed it successfully (later)*/ channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; // Console feedback Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } // This is the fiboinacci implementation // Assumes only valid positive integer input. // Don't expect this one to work for big numbers, and it's // probably the slowest recursive implementation possible. // private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
The RPC client code:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; /* To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named Call which sends an RPC request and blocks until the answer is received. It will be named RpcClient. */ public class RpcClient { /* Any variable type whose name begins with a capital "I" is an interface. For more info about this, read about Dependency injection design pattern */ private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var response = Encoding.UTF8.GetString(body); // Making sure the CorrelationId matches ours if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "RPC", routingKey: "SERVICE_NAME", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
Reliability
- Durability: Exchanges can survive the broker restart, we need to mark the queue and the messages as durable. This simply tells RabbitMQ to save the message to disk instead of RAM.
// We declare the queue as durable channel.QueueDeclare(queue: "QUEUE_NAME", durable: true); // we authenicate the channel that messages are presistent channel.CreateBasicProperties().Persistent = true;
- Message acknowledgements: After the task is processed, we send the ack. This way we are sure that even if a worker dies before fully processing a task, we won't lose it as RabbitMQ will re-queue it.
// We manually send the ack. after we process the task in the callback channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // We turn off auto acknowledgment because we are manually doing it channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
- Fair Dispatch: It guarantees that the jobs are fairly distributed between consumers(if there are more than one) by not pushing tasks directly instead after a consumer finishes the previous one.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
RabbitMQ Clients
After you install RabbitMQ either locally or using a docker container. You can use your favourite language of choice client to interact with it in your application. There are clients for different languages. You can find more about them here.
Also the official docs provide an excellent get started tutorials in many languages.