Low-Level Integrations using MuleSoft - Part 3
This is the 3rd and final part of this article about low-level integrations using MuleSoft.
In the first part a scenario was presented where a company that manufactures white-label (unbranded) products needs to synchronize data between their industrial printers with their Salesforce CRM and Xero Accounting systems and track/monitor via dashboards in Tableau. In the second part a proposed solution design and technical specification was presented and now we finish it with the implementation of this solution in both Salesforce and MuleSoft. Remembering that for brevity and focus, the integration with Tableau and Xero will be mentioned, but not fully implemented.
Implementation in ActiveMQ
Given this is for demonstration purposes, this is very straightforward and all that one would need to to is to login to the ActiveMQ Admin Portal and create these 2 queues:
Other than that, one should just make sure to capture the appropriate host and port details from ActiveMQ as these will be configured as part of the MuleSoft application.
Implementation in Salesforce
As mentioned in part 2, salesforce plays a vital role and is in fact the master data of this solution. After creating all the custom objects, picklists, platform events, etc., this is the time to implement all of them.
Triggers
Print Batch Updated Trigger
When a Print Batch record has its status changed to “READY”, a trigger will publish a new platform event so that one of the printers can receive a new print batch job. A suggested implementation for such a trigger would be as per below:
trigger PrintBatchRecordUpdateTrigger on Print_Batch__c (after update)
? ??
? ? List<Print_Batch_Label__c> printBatchLabels = [SELECT Name,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Id,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Number__c,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Account__r.Id,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Size__c,
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.CreatedDate,
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Status__c,
? ? ? ? ? ? ? ? ? ? ? ? Id,
? ? ? ? ? ? ? ? ? ? ? ? Number__c,
? ? ? ? ? ? ? ? ? ? ? ? Product__r.Id,
? ? ? ? ? ? ? ? ? ? ? ? Quantity__c,
? ? ? ? ? ? ? ? ? ? ? ? CreatedDate,
? ? ? ? ? ? ? ? ? ? ? ? Status__c
? ? ? ? ? ? ? ? ? ?FROM Print_Batch_Label__c
WHERE Print_Batch__c IN : Trigger.new];
? ??
for (Print_Batch_Label__c printBatchLabelItem : printBatchLabels) {
? ? ? ? if (printBatchLabelItem.Status__c == null ||
printBatchLabelItem.Status__c == 'READY') {
? ? ? ??
? ? ? ? ? ? EventBus.publish(
? ? ? ? ? ? ? ? new Print_Batch_Ready_to_Print__e(
? ? ? ? ? ? ? ? ? ? Print_Batch_Labels_JSON__c = JSON.serialize(printBatchLabelItem)
? ? ? ? ? ? ? ? )
? ? ? ? ? ? );
? ? ? ? }
? ? }
}
Ok, with that implemented, when new events are published, a MuleSoft application will be able to pickup and process as expected as we will see later on in the article.
Print Batch Label Status Changed Event
Next thing is to implement a trigger that works as a subscriber to events coming out of the printers (e.g. when a print batch has finished printing) as per below:
trigger PrintBatchLabelStatusChangedTrigger on Print_Batch_Label_Status_Changed__e (after insert)
? ? for (Print_Batch_Label_Status_Changed__e event : Trigger.New) {
? ? ? ? Map<String, Object> jsonObject = (Map<String, Object>) JSON.deserializeUntyped(event.Print_Batch_Label_JSON__c);
? ? ? ??
? ? ? ? if (jsonObject.get('rowType').toString().startsWith('BATCH_LABEL')) {
? ? ? ? ? ??
for (Print_Batch_Label__c printBatchLabelItem : [SELECT Id
FROM Print_Batch_Label__c
WHERE Id = :jsonObject.get('batchLabelId').toString()]) {
? ? ? ? ? ? ? ? printBatchLabelItem.Status__c = jsonObject.get('batchLabelStatus').toString();
? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_LABEL_FINISH' || jsonObject.get('rowType') == 'BATCH_LABEL_FAILURE') {
? ? ? ? ? ? ? ? printBatchLabelItem.Date_Time_Finished__c = DateTime.parse(jsonObject.get('batchLabelDateTime'));
? ? ? ? ? ? }
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_LABEL_START') {? ? ? ? ??
? ? ? ? ? ? ? ? printBatchLabelItem.Date_Time_Started__c = DateTime.parse(jsonObject.get('batchLabelDateTime'));
? ? ? ? ? ? }
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? update printBatchLabelItem;
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? } else {
? ? ? ? ? ? ? ?
for (Print_Batch__c printBatchItem : [SELECT Id
FROM Print_Batch__c
WHERE Print_Batch__c.Id = :jsonObject.get('batchId').toString()]) {
? ? ? ? ? ? ? ? printBatchItem.Status__c = jsonObject.get('batchStatus').toString();
? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_FINISH' ||
jsonObject.get('rowType') == 'BATCH_FAILURE') {
? ? ? ? ? ? ? ? printBatchItem.Date_Time_Finished__c = DateTime.parse(jsonObject.get('batchDateTime'));
? ? ? ? ? ? }
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_START') {? ? ? ? ??
? ? ? ? ? ? ? ? printBatchItem.Date_Time_Started__c = DateTime.parse(jsonObject.get('batchDateTime'));
? ? ? ? ? ? }
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? update printBatchItem;
? ? ? ? ? ? }
}
? ? }
}
And voila! Salesforce implementation is pretty much done and we can now turn our focus to the implementation in MuleSoft. Reminding that the integration with XERO, which can be made via calling the XERO API or using the out-of-the-box connector of MuleSoft is out of scope for this implementation as the focus is on the low-level integration. Let's do this!
Implementation in MuleSoft
Print Batch Process API
Starting with application configuration, let's create a "config.yaml" in resources. That file should have contents similar to these:
salesforce:
? username: AAABBBCCCDDD
? password: EEEFFFGGGHHH
? securityToken: A1b2C3d4E5f6G7h8I9j10K11l12
broker:
? host: "127.0.0.1"
? url: "tcp://eduardo-ponzoni-datacom-pc:61616"
? port: "5672"
? username: "guest"
? password: "guest"
? queue:
? ? inbound: "Print_IN_Queue"
? ? outbound: "Print_OUT_Queue"
Next we create a new configuration file named "connectors.xml" containing the following Global Configuration Elements:
Where the XML code corresponding to them looks like as the snippet below:
<configuration-properties doc:name="Configuration" file="config.yaml" />
<jms:config name="JMS_Config" doc:name="JMS Config">
? <jms:active-mq-connection username="${broker.username}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? password="${broker.password}" >
? ? <jms:factory-configuration brokerUrl="${broker.url}" />
? </jms:active-mq-connection>
</jms:config>
<salesforce:sfdc-config name="Salesforce_Config" doc:name="Salesforce Config">
? <salesforce:basic-connection username="${salesforce.username}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?password="${salesforce.password}"
? ?securityToken="${salesforce.securityToken}" />
</salesforce:sfdc-config>
This application is further comprised by 2 flows:
Print Batch Ready (Salesforce Event Subscriber)
This flow simply subscribes to Salesforce Platform Events "Print Batch Ready to Print" and upon receipt of a new event, extracts the JSON content from payload (Print_Batch_Labels_JSON__c) and publishes a new message to Print (IN) Queue as per the screenshot and implementation below:
领英推荐
<flow name="print-batch-process-apiFlow">
<salesforce:subscribe-channel-listener?
doc:name="Print Batch Ready to Print"
streamingChannel="/event/Print_Batch_Ready_to_Print__e"
config-ref="Salesforce_Config"/>
<jms:publish
doc:name="To Print (IN) Queue"
config-ref="JMS_Config"
destination="${broker.queue.inbound}">
<jms:message >
<jms:body >
<![CDATA[#[output application/json
---
read(payload[0].payload.Print_Batch_Labels_JSON__c, "application/json")]]]>
</jms:body>
</jms:message>
</jms:publish>
</flow>
Print Batch/Label Printer Status Changed (Salesforce Event Publisher)
In contrast to the first flow, this flow listens to messages in Print (OUT) Queue containing Print Batch or Print Batch Label changes coming out of the printers and upon receipt of a new message in this queue, this flow parses the COBOL Copybook contents and transforms to a JSON format before publishing a new Salesforce Platform Event "Print Batch/Label Status Changed" as demonstrated by the screenshot and code snippets below:
__________________________
/ \
| MULESOFT FLOW CODE (XML) |
\__________________________/
<flow name="new-print-batch-subscriber-flow">
<jms:listener doc:name="In Print (OUT) Queue"
? ? ? ? ? ? ? config-ref="JMS_Config"
? destination="${broker.queue.outbound}">
<jms:consumer-type >
<jms:queue-consumer />
</jms:consumer-type>
</jms:listener>
<set-payload value="#[payload]"
? ? ? ? ? ? ?doc:name="Set Payload"
? ? ? ? ? ? ?mimeType="application/flatfile; schemapath=batch-print.ffd" />
<ee:transform doc:name="Transform Message" >
<ee:message >
<ee:set-payload resource="dw/COBOLCopybook_to_SFEvent.dwl" />
</ee:message>
</ee:transform>
<salesforce:publish-platform-event-message platformEventName="Print_Batch_Label_Status_Changed__e"?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?doc:name="Publish platform event message"
? ?config-ref="Salesforce_Config" />
</flow>
_______________________________________________
/ \
| DATAWEAVE CODE (COBOLCopybook_to_SFEvent.dwl) |
\_______________________________________________/
%dw 2.0
output application/java
var eventPayload = {
rowType: payload[0]."batch-row-type",
batchId: payload[0]."batch-id",
batchNumber: payload[0]."batch-number",
batchCustomerId: payload[0]."batch-customer-id",
batchSize: payload[0]."batch-size",
batchDateTime: (payload[0]."batch-date" default "? ? ? ? ? ") ++ " " ++ (payload[0]."batch-time" default "? ? ? ? "),
batchStatus: payload[0]."batch-status",
batchLabelId: payload[0]."batch-label-id",
batchLabelNumber: payload[0]."batch-label-number",
batchLabelProductId: payload[0]."batch-product-id",
batchLabelQuantity: payload[0]."batch-label-quantity",
batchLabelDateTime: (payload[0]."batch-label-date" default "? ? ? ? ? ") ++ " " ++ (payload[0]."batch-label-time" default "? ? ? ? "),
batchLabelStatus: payload[0]."batch-label-status"
}
---
[{
Print_Batch_Label_JSON__c: write(eventPayload, "application/json")
}]
And first part of the MuleSoft implementation is done. We can now work on the counterpart, which I have purposely left to the end of this article, where we in fact deal with low-level integration talking to the Printers via TCP Sockets.
Printer I/O Process API
Similar to the API that deals with Salesforce, in this one we start by creating our configuration properties in a "config.yaml" file that looks like the snippet below:
tcp
? inboundHost: "0.0.0.0"
? inboundPort: "51950"
??
broker:
? host: "127.0.0.1"
? url: "tcp://eduardo-ponzoni-datacom-pc:61616"
? port: "5672"
? username: "guest"
? password: "guest"
? queue:
? ? inbound: "Print_IN_Queue"
? ? outbound: "Print_OUT_Queue"
? ??
printer1:
? tcp:
? ? outboundHost: "127.0.0.1"
? ? outboundPort: "51951"
? ??
printer2:
? tcp:
? ? outboundHost: "127.0.0.1"
? ? outboundPort: "51952"
? ??
printer3:
? tcp:
? ? outboundHost: "127.0.0.1"
? ? outboundPort: "51953"
As seen in these configuration properties, we have corresponding host and port information for 3 printers. In our case this is because as mentioned in the first 2 parts, we have to balance the load in between them applying a very simplistic round-robin logic. Each pair of host and port correspond to one of the 3 printers.
Next we create corresponding Global Configuration Elements in a separate "connectors.xml" configuration file as per screenshot and code shown next:
<os:object-store name="Object_store" doc:name="Object store">
<configuration-properties file="config.yaml" />
<sockets:request-config name="Printers_Sockets_Request_config"
doc:name="Sockets Request config">
<sockets:tcp-requester-connection
host="#[vars.printerConfiguration.outboundHost]"
port="#[vars.printerConfiguration.outboundPort]" />
</sockets:request-config>
<jms:config name="JMS_Config" doc:name="JMS Config">
<jms:active-mq-connection username="${broker.username}"
password="${broker.password}"? >
<jms:factory-configuration brokerUrl="${broker.url}" />
</jms:active-mq-connection>
</jms:config>
<sockets:listener-config name="TCP_Sockets_Listener_config"
doc:name="Sockets Listener config" >
<sockets:tcp-listener-connection host="${tcp.inboundHost}"
port="${tcp.inboundPort}"
keepAlive="true">
<reconnection>
<reconnect-forever />
</reconnection>
<sockets:protocol>
<sockets:direct-protocol rethrowExceptionOnRead="true" />
</sockets:protocol>
</sockets:tcp-listener-connection>
</sockets:listener-config>>
Now that the connectors and corresponding configuration is in place, we can move on to the actual implementation. This application is comprised by 2 flows:
New Print Batch Sender Flow
We start by working on the first flow, which is responsible for subscribing to messages in Print (IN) Queue and upon receipt of a new message (new print batch ready), determines what printer should be responsible for printing and then transforms the payload from Salesforce Platform Event JSON to COBOL Copybook format before establishing a TCP Socket connection and transmitting the data to the printer. The below screenshot and snippet illustrate how that flow works:
__________________________
/ \
| MULESOFT FLOW CODE (XML) |
\__________________________/
<flow name="new-print-batch-sender-flow">
<jms:listener doc:name="In Print (IN) Queue"?
? ? ? ? ? ? ? config-ref="JMS_Config" ackMode="MANUAL"
? inboundContentType="application/json"
? destination="${broker.queue.inbound}">
<jms:consumer-type >
<jms:queue-consumer />
</jms:consumer-type>
</jms:listener>
<set-variable value="#[attributes.ackId]" doc:name="ackId" variableName="ackId"/>
<os:retrieve doc:name="Last Printer Id" key="lastPrinterId" objectStore="Object_store" target="lastPrinterId">
<os:default-value ><![CDATA[#[0]]]></os:default-value>
</os:retrieve>
<ee:transform doc:name="Determine Printer to Execute Print Batch Job">
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="printerConfiguration" ><![CDATA[%dw 2.0
output application/java
var printer = "printer" ++ (if (vars.lastPrinterId >= 3) 1 else (vars.lastPrinterId as Number + 1))
---
{
outboundHost: p(printer ++ ".tcp.outboundHost"),
outboundPort: p(printer ++ ".tcp.outboundPort")
}]]></ee:set-variable>
</ee:variables>
</ee:transform>
<ee:transform doc:name="Salesforce Platform Event to COBOL Copybook">
<ee:message>
<ee:set-payload resource="dw/SFEvent_to_COBOLCopybook.dwl" />
</ee:message>
</ee:transform>
<sockets:send doc:name="Print Batch Message to Printer" config-ref="Printers_Sockets_Request_config" />
<os:store doc:name="Last Printer Id" key="lastPrinterId" objectStore="Object_store">
<os:value ><![CDATA[#[((vars.lastPrinterId as Number) + 1)]]]></os:value>
</os:store>
<jms:ack doc:name="Ack" ackId="#[vars.ackId]"/>
</flow>
_______________________________________________
/ \
| DATAWEAVE CODE (SFEvent_to_COBOLCopybook.dwl) |
\_______________________________________________/
%dw 2.0
output application/flatfile schemaPath="batch-print.ffd", segmentIdent="batch-print"
fun getDateAsDDMMYYYY(dateTimeValue) = (
dw::core::Strings::substring(dateTimeValue, 8, 10) ++ "/" ++
dw::core::Strings::substring(dateTimeValue, 5, 7) ++ "/" ++
dw::core::Strings::substring(dateTimeValue, 0, 4)
)
fun getTimeAsHHMMSS(dateTimeValue) = (
dw::core::Strings::substring(dateTimeValue, 11, 19)
)
fun transformToCopybook(value) = (
value map ((item, index) -> {
"batch-row-type": "BATCH_START",
"batch-id": item.Print_Batch__r.Id,
"batch-number": item.Print_Batch__r.Number__c default "1",
"batch-customer-id": item.Print_Batch__r.Account__r.Id,
"batch-size": item.Print_Batch__r.Quantity__c default "1",
"batch-date": getDateAsDDMMYYYY(item.Print_Batch__r.CreatedDate),
"batch-time": getTimeAsHHMMSS(item.Print_Batch__r.CreatedDate),
"batch-status": item.Status__c default "READY",
"batch-label-id": item.Id,
"batch-label-number": item.Number__c default "1",
"batch-product-id": item.Product__r.Id,
"batch-label-quantity": item.Quantity__c default "1",
"batch-label-date": getDateAsDDMMYYYY(item.CreatedDate),
"batch-label-time": getTimeAsHHMMSS(item.CreatedDate),
"batch-label-status": item.Status__c default "READY"
})
)
---
if (payload is Array) (transformToCopybook(payload)) else transformToCopybook([] ++ [ payload ])
Printer Batch/Label Status Changed Listener Flow
Finishing our solution implementation, this "listener" flow is responsible for starting a TCP Socket server waiting for COBOL Copybook messages from either of the printers containing status updates of batch or individual labels. Upon receipt of a message in this TCP channel, the flow simply publishes the message as received to the Print (OUT) queue for further processing as seen before in the Salesforce Event publisher flow. Yet again, this very simple and straight forward flow is represented by the below screenshot and code snipped:
<flow name="printer-io-status-changed-listener-flow">
<sockets:listener doc:name="TCP server listener"
outputMimeType="text/flatfile"
config-ref="TCP_Sockets_Listener_config"/>
<jms:publish doc:name="Message to Print (OUT) Queue"
config-ref="JMS_Config"
destination="${broker.queue.outbound}">
<jms:message outboundContentType='application/flatfile' />
</jms:publish>
</flow>
And we are done! This concludes the implementation part and this series of 3 articles aiming at recaptulating and demonstrating some of the capabilities that MuleSoft offers out of the box, sometimes neglected or not used, but that certainly are still very much needed as seen.
For seeing all of that in action without having the actual printers, I've put together a little Spring Boot application that "plays the role" of the printer for demonstration purposes, which can be downloaded from my GitHub. Otherwise, you can have a little sneak-peek yourself based on the screenshots below from both Salesforce and the little printer app.
Awesome! And that's it folks! I hope you have enjoyed and that this can actually benefit you and your company in case you need to face similar challenge and have the need to go back to basics and work with low-lever integrations. Go on, have fun and take your Mule for a ride. Going either high-level or low-level I will leave to your particular needs and capable hands.
Happy integrations!