RabbitMQ EventBus system
Linagora Vietnam Recruiter
LINAGORA là t?p ?oàn tiên phong c?a Pháp ho?t ??ng trong l?nh v?c Ph?n m?m Ngu?n m?.
The reason we needed a new EventBus System.
I am a developer of Apache James project, and we are developing a distributed mail server. In brief, there was an event system in James, which was used to deliver events to proper components to handle them. Those components, which are called MailboxListener in our system, are supposed to be handling notifications(JMAP pushing notifications, IMAP IDLE notifications…) or extending easily James mailbox (MDA) capabilities (SpamAssassin to handle spam mails, indexing mails to serve mail searching by ElasticSearch…). But this system was bounded by per JVM. It also means that this event system was not distributed: an event generated on James server would not be dispatched to another James server.
To makes James really being distributed, the requirement of having an event system which can deliver events across James instances was absolutely needed.
What an EventBus can do?
So what are the characteristics of the new event system that we need to make it become a distributed one:
- A publisher component is in charge of publishing events without caring where events are consumed. Events are delivered to global listeners (listeners receive every coming events) and key registered listeners (listeners receive only events they are interested in).
- A consumer component is in charge of consuming events without caring where the events came from.
- An event is delivered to listeners which are interested in, across James instances.
- Any James server can join or leave the event system without any problem.
From those points our future event system really looks like an EventBus.
Retrying in EventBus? When? When not?
That is the EventBus you hope to have in the perfect world, where there is no error happening, let’s come back to reality, where error handling is an essential part of every event delivering system.
First, retrying mechanism
If a listener fails to execute an event then the event should be re-handled by the same listener in another James server. Then why do we need another James server? The answer is simple, we want to minimize the chance of the affectation from outside factors to our listeners. For example, if a listeners has failed to execute an event, there could be a possible reason of networking, memory, hard disk issues. So we don’t want that listener to retry again because the results would be the same; instead, we delegate an execution to the same listener but at another place which helps us to gain a better probability that the event will be successfully executed.
Not only that, but retrying an event after the same interval every time is not the ultimate retrying solution. We apply exponential backoff technique to retrying process, making random delay time between retries increasing by exponential growth function.
Second, at least once in messaging terminology
Luckily, at-least-once is supported by RabbitMQ. If a listener of the first James server is handling an event, but unfortunately, first James is down for unknown reason, then the EventBus system has to acknowledge that a consumer suddenly stops, and re-deliver an event to another listener.
Third, even when retrying could not help
When retrying process get over the maximum number of retries you allowed, then probably, you can guess that this event is going to continue to fail if we keep retrying it. Maybe there is a bug in your code or a hardware problem and you need to resolve those problems before executing failed events. And a possible way at the moment is storing it in a Cassandra table, so the admin can trigger a new delivery once the problem is fixed.
How do we implement it?
Introduce the API
The EventBus is able to register listeners and deliver events to registered listeners. There are two types of registration as I listed above, global registration and key registration — the registration for the listeners that they are only interested in. A registered listener is able to unregister in future. So we proposed an EventBus API like this:
public interface EventBus { Registration register(MailboxListener listener, RegistrationKey key); Registration register(MailboxListener listener, Group group) throws GroupAlreadyRegistered; Mono<Void> dispatch(Event event, Set<RegistrationKey> key); } public class Group { } public interface Registration { void unregister(); } public interface RegistrationKey { String asString(); }
Global registration is described by using Group registration in code, where every listener registers as a group registration. All will receive events that they are dispatched to the EventBus by
EventBus.dispatch(Event event, Set<RegistrationKey> keys)
With key registration, only registered listeners with a specific key will receive events dispatched to the EventBus by corresponding RegistrationKey in the set passed to
EventBus.dispatch(Event event, Set<RegistrationKey> keys)
RabbitMQEventBus design
multiple James servers connect to global queues for global registration
To achieve global registration, we created a queue for each registered group. Queue names are made from group class names to distinguish from other group registrations:
static class WorkQueueName { @VisibleForTesting static WorkQueueName of(Group group) { return new WorkQueueName(group); } static final String MAILBOX_EVENT = "mailboxEvent"; static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-"; private final Group group; private final String name; private WorkQueueName(Group group) { Preconditions.checkNotNull(group, "Group must be specified"); this.group = group; this.name = groupName(group.getClass()); } public Group getGroup() { return group; } String asString() { return MAILBOX_EVENT_WORK_QUEUE_PREFIX + name; } }
With a class QuotaUpdateGroup extends Group, corresponding queue name would be:
mailboxEvent-workQueue-full.qualified.domain.name.QuotaUpdateGroup
and this queue is consumed/shared by many consumers across James instances. Once the event is dispatched to this queue, only one in the list for consumers listening queue name can consume that event. That is the characteristic we want for the group registration. Once an event is coming, it should be alright if we only execute it once, and no need to execute it multiple times which would lead to the same result while taking more hardware resources, thus, it’s not necessary.
Queue declaration and listening are performed by using apis from reactor-rabbitmq library, and the queue is bound to global exchange by the empty string routing key, the key is used for sharing with all global registrations:
class GroupRegistration implements Registration { private final Sender sender; private final WorkQueueName queueName; private Optional<Disposable> receiverSubscriber; GroupRegistration(Sender sender, Group group) { this.queueName = WorkQueueName.of(group); this.sender = sender; this.receiverSubscriber = Optional.empty(); } GroupRegistration start() { createGroupWorkQueue() .then(retryHandler.createRetryExchange()) .doOnSuccess(any -> this.subscribeWorkQueue()) .block(); return this; } private Mono<Void> createGroupWorkQueue() { return Flux.concat( sender.declareQueue(QueueSpecification.queue(queueName.asString()) .durable(true) .exclusive(false) .autoDelete(false) .arguments(NO_ARGUMENTS)), sender.bind(BindingSpecification.binding() .exchange(MAILBOX_EVENT_EXCHANGE_NAME) .queue(queueName.asString()) .routingKey(EMPTY_ROUTING_KEY))) .then(); } private void subscribeWorkQueue() { receiverSubscriber = Optional.of(receiver.consumeManualAck(queueName.asString()) .subscribeOn(Schedulers.parallel()) .filter(delivery -> Objects.nonNull(delivery.getBody())) .flatMap(this::deliver) .subscribeOn(Schedulers.elastic()) .subscribe()); } private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) { ... } @Override public void unregister() { ... } }
The queue is declared as a durable, non exclusive, and non auto delete when there is no consumer, then the queue is able to be shared to other consumers in different James servers, it is not deleted if all consumers stop listening, and is able to recover messages inside the queue in case rabbitMQ unexpectedly stop.
With global delivery, execution being successful is one of its essential concerns because of their importance. When a global listener has failed to rename a mailbox, delete a mailbox or update current quota of an user, we consider they are critical issues. It shows the necessity to handle an event in case of error. So we apply retrying to global delivery, the the failed event is rotated to be delivered to the same listeners across James servers util it’s successful or number of retry gets over max retries. In that worst case, storing events to Cassandra table is our last step of error handling procedure.
Multiple James servers connect to queues for key registration
And for key registration, once an event is dispatched with a specific key, all listeners registering that specific key across James instances should all receive that event. It’s one different point when comparing to group registration. In order to do that, for each James server, when a listener is registered with a key, we would want to create an exclusive queue to keep it consumed by only one consumer, and it should not be shared with others. The queue is created with a random name and bound to global exchange by the routing key which made from KeyRegistration, that is another different point with the routing key in global registration which is an empty string. We made a class for converting a KeyRegistration to a routing key and vice versa. The logic is quite simple, with a KeyRegistration instance, the corresponding routing key would be:
full.qualified.domain.name.ClassNameOfKeyRegistration + “:” + keyRegistration.asString()
Queue declaration and listening are pretty much the same as group registration but with difference at some queue properties needed for key registrations.
Key registration is registered by notification listeners. Not like global listener, if an event delivery has failed, we evaluate that the problem is not critical enough to apply error handling.
With dispatching event, it’s quite easy now. First, I need to publish my events to group registration consumers which are connected to global exchange by empty routing key. Second, publish events to key registration consumers which are connected to global exchange by specific routing keys made from KeyRegistration set passed as the second parameter in dispatch() method. Here is a main part of our dispatching logic where we do dispatch to group consumers and key consumers:
public class EventDispatcher { ... private Mono<Void> doDispatch(Mono<byte[]> serializedEvent, Set<RegistrationKey> keys) { Flux<RoutingKeyConverter.RoutingKey> routingKeys = Flux.concat( // produce empty routing key Mono.just(RoutingKeyConverter.RoutingKey.empty()), // produce list of routing key computed from Set<RegistrationKey> Flux.fromIterable(keys).map(RoutingKeyConverter.RoutingKey::of)); // create rabbitmq messages with the body is byte array serialized // from event with each routing key computed above Flux<OutboundMessage> outboundMessages = routingKeys .flatMap(routingKey -> serializedEvent .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, payload))); // sending multiple messages to rabbitmq return sender.send(outboundMessages); } ... }
For now I can do dispatching events to queues I want by using routing keys computed by a set of RegistrationKey. The messages come to queues and are consumed by various listeners in different James servers. This is how our distributed EventBus works.
Wrapping up
I hope this article helps you to understand a bit of our problems, leading to the way we design an EventBus system on top of RabbitMQ with different delivery types and how we handle errors, designing all of this from scratch.
By Mr. Tran Tien Duc, Java Team, Linagora Vietnam
Thanks to Benoit Tellier, Rapha?l Ouazana, and Rene Cordier.