Using Worker verticles in Vert.x
Running blocking code has always been tricky in micro-service application architectures. Especially when writing services that act as an intermediary, achieving high capacity and better performance while churning requests is never going to be straight forward.
Vert.x
Vert.x provides a flexible architecture to drive request processing off the event loop. But, we cannot block the event loop with long running tasks or any king of processing that blocks the event loop thread. In some cases, when we have to call a synchronous API or interact with another application over the network, the goal is to run such tasks on a scheduler or make use of worker threads. In my previous article on how to use workers, we have seen a way to schedule such tasks on a thread pool, and taking advantage of RxJava Observables to subscribe to results.
Let’s take the next step forward to improve our design with the use of Worker verticles.
A worker verticle is just like a standard verticle, but it’s executed using a thread from the Vert.x worker thread pool, rather than using an event loop. Worker verticles are designed for calling blocking code, as they won’t block any event loops. Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can be executed by different threads at different times.
Deploying worker verticle
Deploying worker verticles is similar to deploying normal verticles, but with specifying some deployment options.
private void addWorker() { DeploymentOptions workerOpts = new DeploymentOptions() .setConfig(config) .setWorker(true) .setInstances(1) .setWorkerPoolSize(1); vertx.deployVerticle(ApiWorkerVerticle.class.getName(), workerOpts, res -> { if(res.failed()){ m_logger.error("Failed to deploy worker verticle {}", ApiWorkerVerticle.class.getName(), res.cause()); } else { String depId = res.result(); m_deployed_verticles.add(depId); m_logger.info("Deployed verticle {} DeploymentID {}", ApiWorkerVerticle.class.getName(), depId); } }); }
Communicating with Worker Verticle
All communications with the worker verticles happen through the eventbus. The main verticle will send a request message to a worker verticle on a well-known subject, and the response is either sent back on the same subject or on a different subject.
EventBus Producer
The event bus has two ways of communication. Send/Receive or Publish (broadcast).
- send — This is two way communication that allows us to set a reply handler that will process the response from our worker.
- publish — This is a send-and-forget type of communication that can be used when you don’t care about a response. You are simply publishing an event that you need your workers to be aware of. Once the task is completed by the worker, it can publish the result on another topic. We can use request IDs to link the request and the associated response.
All messages are sent as buffers, and by default Vert.x allows sending String and Buffer types. For any other type, Vert.x looks up a MessageCodec that specifies serialization into / from a buffer. In our API verticle, we can now write our producer code as follows:
private void getOne(RoutingContext rc) { HttpServerResponse response = rc.response(); MessageProducer<String> producer = vertx.eventBus().publisher("WORKER"); producer.send(id, result -> { if (result.succeeded()) { response .setStatusCode(201) .putHeader("content-type", "application/json; charset=utf-8") .end(Json.encodePrettily(result.result())); } else { response .setStatusCode(404) .putHeader("content-type", "application/json; charset=utf-8") .end(); } });
Consuming EventBus messages by Worker Verticle
Worker verticle is structured just like any other verticle, except it will use the event bus consumer to listen for messages and reply to them.
MessageConsumer<String> consumer = vertx.eventBus().consumer("WORKER"); consumer.handler(m -> { String id = m.body(); m_api.fetchResource(id) .subscribeOn(m_scheduler) .observeOn(m_scheduler) .subscribe(r -> { m.reply(Json.encode(r)); }, e -> { m.fail(0, "API Error"); }, () -> {}); });
We now have an application architecture that can scale and react to load, and still be highly responsive to inbound requests. We can now successfully use this architecture to offload any long running synchronous tasks to the workers, and continue to serve other inbound traffic without being blocked by previous requests.