Implementing Dead Letter Channel Integration Pattern with Mulesoft and RabbitMQ
In today's interconnected world, seamless communication between disparate systems and applications is crucial for the smooth functioning of businesses. Messaging systems play a pivotal role in enabling reliable, asynchronous communication between applications. One such powerful messaging system is RabbitMQ, an open-source message broker that offers robust message queuing and distribution capabilities.
?While RabbitMQ provides a solid foundation for building scalable and decoupled systems, integrating it effectively with other applications requires a reliable integration platform. This is where MuleSoft comes into play. MuleSoft's Anypoint Platform offers a comprehensive set of tools and features that simplify the process of connecting, orchestrating, and managing integrations with RabbitMQ.
?In this article, Api Connects will delve into the realm of integrating RabbitMQ with MuleSoft. We will be on implementing the dead letter channel integration with Mulesoft, leveraging RabbitMQ's core concepts such as channels, queues, bindings, and acknowledgment types.
What is a dead letter channel?
The Dead Letter Channel (DLC) integration pattern is a powerful mechanism used to handle and manage messages that have failed to be delivered or processed by a messaging system. It provides a way to capture and redirect these failed messages to a designated queue, known as a Dead Letter Queue (DLQ), for further analysis and resolution.
By implementing the Dead Letter Channel integration pattern, organizations can achieve greater visibility into message processing failures, ensure important messages are not lost or discarded, and enable proactive troubleshooting and resolution of issues.
?Dead Letter Use Case: Overview
Dead Letter Use Case: Prerequisites
?
● ? ? Anypoint platform
● ? ? Postgres
● ? ? RabbitMQ
Dead Letter Use Case: Steps?
Here’s how to go ahead with dead letter channel integration:?
Step 1
Add the AMQP module from the exchange as below.
Step 2
Let’s create the Data service that will save the incoming message to the Postgres database.
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="7a8558ce-2084-4c53-ad69-cdedb467a201">
? ? ? ? ? <http:listener-connection host="0.0.0.0" port="8081" />
? ? </http:listener-config>
? ? <db:config name="Database_Config" doc:name="Database Config" doc:id="58e3c609-125d-4329-b8a6-b0ea266e8cd6" >
? ? ? ? ? <db:generic-connection url="jdbc:postgresql://localhost:5432/salesevent" driverClassName="org.postgresql.Driver" user="kong" password="kongpass" />
? ? </db:config>
<flow name="dataserviceflow" doc:id="c3b269cb-1888-4047-a235-77d91af8ada0" >
? ? ? ? ? <http:listener doc:name="Listener" doc:id="7b8fff1c-fb55-4fa8-a099-7b570d214ead" config-ref="HTTP_Listener_config" path="/db-service" allowedMethods="POST">
? ? ? ? ? ? ? ? ? <http:response statusCode="200" />
? ? ? ? ? ? ? ? ? <http:error-response statusCode="500" />
? ? ? ? ? </http:listener>
? ? ? ? ? <logger level="INFO" doc:name="Logger" doc:id="5d7f2935-1eba-4538-918e-51afad5faa93" message="#[payload]" />
? ? ? ? ? <set-variable value="#[output application/java --- payload.sale.itemName]" doc:name="Set item name" doc:id="b1a3892d-d731-4f0d-9c5f-8f73d2f98347" variableName="itemName" mimeType="application/java"/>
? ? ? ? ? <set-variable value="#[output application/java --- payload.sale.price]" doc:name="Set price" doc:id="97f83579-3206-4f91-b87b-43f7bc3cfde5" variableName="price"/>
? ? ? ? ? <db:insert doc:name="Insert" doc:id="6d195084-ed0e-4f2b-857b-6b21f15e2940" config-ref="Database_Config">
? ? ? ? ? ? ? ? ? <db:sql><![CDATA[INSERT INTO sales(itemName, price) VALUES(:itemName, :price)]]></db:sql>
? ? ? ? ? ? ? ? ? <db:input-parameters ><![CDATA[#[{'itemName':vars.itemName, 'price':vars.price}]]]></db:input-parameters>
? ? ? ? ? </db:insert>
? ? ? ? ? <error-handler>
? ? ? ? ? ? ? ? ? <on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="4a334b7d-56e1-487a-8153-eacca7a25200" type="DB:BAD_SQL_SYNTAX, DB:CONNECTIVITY, DB:QUERY_EXECUTION, MULE:EXPRESSION">
? ? ? ? ? ? ? ? ? ? ? ? <logger level="INFO" doc:name="Logger" doc:id="74c73516-f39b-46fb-8e23-902dc1546fc4" message="Database failure" />
? ? ? ? ? ? ? ? ? </on-error-propagate>
? ? ? ? ? </error-handler>
? ? </flow>
?
Step 3
The implementation of the dead letter channel flow follows the steps outlined below. Upon receiving a message from the listener, the flow will attempt to save the message to the database. If an internal error occurs during this process, the flow will retry up to three times. Once the message is successfully written to the database, the flow will manually acknowledge RabbitMQ for a successful delivery.
?
In case of a failure to write the message to the database, the flow will reroute the message to a dead letter queue while continuing to process the next incoming messages. Furthermore, if an HTTP failure occurs, the message will be retained in the queue as an unacknowledged message.
<amqp:config name="AMQP_Config" doc:name="AMQP Config" doc:id="f795a1e7-f067-4fd8-a724-3f7502cd06d9" >
? ? ? ? ? <amqp:connection host="localhost" port="5672" username="user" password="password" />
? ? </amqp:config>
? ? <http:request-config name="HTTP_Request_configuration" doc:name="HTTP Request configuration" doc:id="62639d88-e44c-42db-81df-9c25169395c8" >
? ? ? ? ? <http:request-connection host="localhost" port="8081"/>
? ? </http:request-config>
<flow name="rabbitmqdemoFlow" doc:id="1a73adac-7e4f-4b95-b049-94e8b4bbf235" >
领英推荐
? ? ? ? ? <amqp:listener doc:name="Listener" doc:id="57cfd65e-4d81-4bca-9095-4b9cfbcbc0f4" config-ref="AMQP_Config" queueName="sales-event-queue" ackMode="MANUAL" outputMimeType="application/json"/>
? ? ? ? ? <logger level="INFO" doc:name="Logger1" doc:id="b32bf073-4aa6-499b-82cb-3d96f0edc3db" message="#[payload]" />
? ? ? ? ? <set-variable value="#[attributes.ackId]" doc:name="Set Variable" doc:id="5585fe4a-d4a9-4eb8-afa1-dbd1c12fbbaa" variableName="ackId"/>
? ? ? ? ? <until-successful maxRetries="3" doc:name="Until Successful" doc:id="15fa4187-6a31-4903-a64c-8677140c100f" millisBetweenRetries="600">
? ? ? ? ? ? ? ? ? <try doc:name="Try" doc:id="14371d2e-19ae-43f7-a220-3bae5a7e314b" >
? ? ? ? ? ? ? ? ? ? ? ? <http:request method="POST" doc:name="Request" doc:id="1c42ce96-ef09-454b-bc3f-a2cc3de5d931" config-ref="HTTP_Request_configuration" path="/db-service" outputMimeType="application/json">
? ? ? ? ? ? ? ? ? ? ? ? <http:response-validator>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <http:success-status-code-validator values="200" />
? ? ? ? ? ? ? ? ? </http:response-validator>
? ? ? ? ? </http:request>
? ? ? ? ? ? ? ? ? ? ? ? <error-handler >
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="0ea26b21-7089-43ab-9907-4650ffee849d" type="HTTP:BAD_GATEWAY, HTTP:BAD_REQUEST, HTTP:CLIENT_SECURITY, HTTP:CONNECTIVITY, HTTP:FORBIDDEN, HTTP:METHOD_NOT_ALLOWED, HTTP:NOT_ACCEPTABLE, HTTP:NOT_FOUND, HTTP:PARSING, HTTP:RETRY_EXHAUSTED, HTTP:SECURITY, HTTP:SERVICE_UNAVAILABLE, HTTP:TIMEOUT, HTTP:TOO_MANY_REQUESTS, HTTP:UNAUTHORIZED, HTTP:UNSUPPORTED_MEDIA_TYPE, EXPRESSION, STREAM_MAXIMUM_SIZE_EXCEEDED">
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <logger level="INFO" doc:name="Logger" doc:id="94114d38-e741-4210-a289-1fe027f2c6c3" message="System service call failed, skip retry" />
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? </on-error-continue>
? ? ? ? ? ? ? ? ? ? ? ? </error-handler>
? ? ? ? ? ? ? ? ? </try>
? ? ? ? ? </until-successful>
? ? ? ? ? <set-variable value="#[attributes.statusCode]" doc:name="Set Variable" doc:id="5a5d2c16-32dc-4665-bf0e-84105b444fed" variableName="statusCode"/>
? ? ? ? ? <choice doc:name="Choice" doc:id="90620a2a-e2ca-479d-bf45-09b5a6345c48" >
? ? ? ? ? ? ? ? ? <when expression="#[vars.statusCode == 200]">
? ? ? ? ? ? ? ? ? ? ? ? <logger level="INFO" doc:name="Logger" doc:id="2fa6bb2c-7341-461a-9222-6daad83421da" message="system servcice call success" />
? ? ? ? ? ? ? ? ? ? ? ? <amqp:ack doc:name="Ack" doc:id="751ae1b9-1e6b-41e6-abfe-f9acbba1096e" ackId="#[vars.ackId]" />
? ? ? ? ? ? ? ? ? </when>
? ? ? ? ? ? ? ? ? <otherwise >
? ? ? ? ? ? ? ? ? ? ? ? <logger level="INFO" doc:name="Logger" doc:id="b7305db3-4377-420d-b28a-70f3771065b8" message="system servcice call faild" />
? ? ? ? ? ? ? ? ? </otherwise>
? ? ? ? ? </choice>
? ? ? ? ? <error-handler>
? ? ? ? ? ? ? ? ? <on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="7722dc61-78cc-41b7-8937-b95fdebfa31c" type="MULE:RETRY_EXHAUSTED">
? ? ? ? ? ? ? ? ? ? ? ? <logger level="INFO" doc:name="Logger" doc:id="6ef0c2e6-6150-4569-baa9-c6b2b7a667a4" message="Error occured. Move to dead letter queue" />
? ? ? ? ? ? ? ? ? ? ? ? <amqp:publish doc:name="Publish" doc:id="13eee051-286b-4020-a7da-231fe544f066" config-ref="AMQP_Config" exchangeName="salesErrorEx">
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <amqp:routing-keys>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? <amqp:routing-key value="dead" />
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? </amqp:routing-keys>
? ? ? ? ? ? ? ? ? ? ? ? </amqp:publish>
? ? ? ? ? ? ? ? ? ? ? ? <amqp:ack doc:name="Ack" doc:id="9818c897-9215-4a5b-818f-1570f2a7b0a7" ackId="#[vars.ackId]" />
? ? ? ? ? ? ? ? ? ? ? ? <logger level="INFO" doc:name="Logger1" doc:id="0dd13899-6b4c-49c9-89ae-719e3bca074f" message="Move to dead letter queue success" />
? ? ? ? ? ? ? ? ? </on-error-continue>
? ? ? ? ? </error-handler>
? ? </flow>
?
?
Furthermore, it is possible to implement an API that allows manual retrieval of messages that have been routed to the dead letter queue. This provides a robust recovery mechanism to ensure no messages are lost from the system.
Finally, let's publish the following message to RabbitMQ.
{
"sale":{
? ? "itemName": "Printer",
? ? "price": "110.10"
}
}
After successful processing we can see our message in the database as follows.
Error scenarios
1. When the data service return BAD Gateway we can see the message preserves in the original queue as unacked message.
2. When the data service returns 500 Internal Server Error, we can see the message reroutes to the dead letter queue.
Integrating RabbitMQ with MuleSoft opens up a world of possibilities for building efficient and resilient messaging systems. Throughout this article, we explored the key concepts of RabbitMQ, such as channels, queues, bindings, and acknowledgment types. We also delved into the powerful Dead Letter Channel integration pattern, which helps capture and handle failed messages.
Now, it's time to apply what you've learned and unlock the true potential of your integration landscape by seamlessly integrating with RabbitMQ using MuleSoft.
Still, got queries to ask? Leave them in the comments or email at [email protected] and get a quick reply from the experts.?
Also discover our Mulesoft Azure technology to gain the power of an open, flexible, and scalable cloud platform.?