Low-Level Integrations using MuleSoft - Part 3

Low-Level Integrations using MuleSoft - Part 3

This is the 3rd and final part of this article about low-level integrations using MuleSoft.

In the first part a scenario was presented where a company that manufactures white-label (unbranded) products needs to synchronize data between their industrial printers with their Salesforce CRM and Xero Accounting systems and track/monitor via dashboards in Tableau. In the second part a proposed solution design and technical specification was presented and now we finish it with the implementation of this solution in both Salesforce and MuleSoft. Remembering that for brevity and focus, the integration with Tableau and Xero will be mentioned, but not fully implemented.

Implementation in ActiveMQ

Given this is for demonstration purposes, this is very straightforward and all that one would need to to is to login to the ActiveMQ Admin Portal and create these 2 queues:

  • Print_IN_Queue
  • Print_OUT_Queue

Other than that, one should just make sure to capture the appropriate host and port details from ActiveMQ as these will be configured as part of the MuleSoft application.

Implementation in Salesforce

As mentioned in part 2, salesforce plays a vital role and is in fact the master data of this solution. After creating all the custom objects, picklists, platform events, etc., this is the time to implement all of them.

Triggers

Print Batch Updated Trigger

When a Print Batch record has its status changed to “READY”, a trigger will publish a new platform event so that one of the printers can receive a new print batch job. A suggested implementation for such a trigger would be as per below:

trigger PrintBatchRecordUpdateTrigger on Print_Batch__c (after update) 
? ??
? ? List<Print_Batch_Label__c> printBatchLabels = [SELECT Name,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Id,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Number__c,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Account__r.Id,?
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Size__c,
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.CreatedDate,
? ? ? ? ? ? ? ? ? ? ? ? Print_Batch__r.Status__c,
? ? ? ? ? ? ? ? ? ? ? ? Id,
? ? ? ? ? ? ? ? ? ? ? ? Number__c,
? ? ? ? ? ? ? ? ? ? ? ? Product__r.Id,
? ? ? ? ? ? ? ? ? ? ? ? Quantity__c,
? ? ? ? ? ? ? ? ? ? ? ? CreatedDate,
? ? ? ? ? ? ? ? ? ? ? ? Status__c
? ? ? ? ? ? ? ? ? ?FROM Print_Batch_Label__c
                  WHERE Print_Batch__c IN : Trigger.new];
? ??
	for (Print_Batch_Label__c printBatchLabelItem : printBatchLabels) {


? ? ? ? if (printBatchLabelItem.Status__c == null ||
            printBatchLabelItem.Status__c == 'READY') {
? ? ? ??
? ? ? ? ? ? EventBus.publish(
? ? ? ? ? ? ? ? new Print_Batch_Ready_to_Print__e(
? ? ? ? ? ? ? ? ? ? Print_Batch_Labels_JSON__c = JSON.serialize(printBatchLabelItem)
? ? ? ? ? ? ? ? )
? ? ? ? ? ? );
? ? ? ? }
? ? }
}        

Ok, with that implemented, when new events are published, a MuleSoft application will be able to pickup and process as expected as we will see later on in the article.

Print Batch Label Status Changed Event

Next thing is to implement a trigger that works as a subscriber to events coming out of the printers (e.g. when a print batch has finished printing) as per below:

trigger PrintBatchLabelStatusChangedTrigger on Print_Batch_Label_Status_Changed__e (after insert) 


? ? for (Print_Batch_Label_Status_Changed__e event : Trigger.New) {


? ? ? ? Map<String, Object> jsonObject = (Map<String, Object>) JSON.deserializeUntyped(event.Print_Batch_Label_JSON__c);
? ? ? ??
? ? ? ? if (jsonObject.get('rowType').toString().startsWith('BATCH_LABEL')) {
? ? ? ? ? ??
			for (Print_Batch_Label__c printBatchLabelItem : [SELECT Id
                                                               FROM Print_Batch_Label__c
                                                              WHERE Id = :jsonObject.get('batchLabelId').toString()]) {


? ? ? ? ? ? ? ? printBatchLabelItem.Status__c = jsonObject.get('batchLabelStatus').toString();


? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_LABEL_FINISH' || jsonObject.get('rowType') == 'BATCH_LABEL_FAILURE') {
? ? ? ? ? ? ? ? 	printBatchLabelItem.Date_Time_Finished__c = DateTime.parse(jsonObject.get('batchLabelDateTime'));
? ? ? ? ? ? 	}
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_LABEL_START') {? ? ? ? ??
? ? ? ? ? ? ? ? 	printBatchLabelItem.Date_Time_Started__c = DateTime.parse(jsonObject.get('batchLabelDateTime'));
? ? ? ? ? ? 	}
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? update printBatchLabelItem;
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? } else {
? ? ? ? ? ? ? ?
			for (Print_Batch__c printBatchItem : [SELECT Id
                                                    FROM Print_Batch__c
                                                   WHERE Print_Batch__c.Id = :jsonObject.get('batchId').toString()]) {


? ? ? ? ? ? ? ? printBatchItem.Status__c = jsonObject.get('batchStatus').toString();


? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_FINISH' || 
                    jsonObject.get('rowType') == 'BATCH_FAILURE') {
? ? ? ? ? ? ? ? 	printBatchItem.Date_Time_Finished__c = DateTime.parse(jsonObject.get('batchDateTime'));
? ? ? ? ? ? 	}
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? if (jsonObject.get('rowType') == 'BATCH_START') {? ? ? ? ??
? ? ? ? ? ? ? ? 	printBatchItem.Date_Time_Started__c = DateTime.parse(jsonObject.get('batchDateTime'));
? ? ? ? ? ? 	}
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? update printBatchItem;
? ? ? ? ? ? }
		}
? ? }
}        

And voila! Salesforce implementation is pretty much done and we can now turn our focus to the implementation in MuleSoft. Reminding that the integration with XERO, which can be made via calling the XERO API or using the out-of-the-box connector of MuleSoft is out of scope for this implementation as the focus is on the low-level integration. Let's do this!

Implementation in MuleSoft

Print Batch Process API

Starting with application configuration, let's create a "config.yaml" in resources. That file should have contents similar to these:

salesforce:
? username: AAABBBCCCDDD
? password: EEEFFFGGGHHH
? securityToken: A1b2C3d4E5f6G7h8I9j10K11l12

broker:
? host: "127.0.0.1"
? url: "tcp://eduardo-ponzoni-datacom-pc:61616"
? port: "5672"
? username: "guest"
? password: "guest"
? queue:
? ? inbound: "Print_IN_Queue"
? ? outbound: "Print_OUT_Queue"        

Next we create a new configuration file named "connectors.xml" containing the following Global Configuration Elements:

No alt text provided for this image

Where the XML code corresponding to them looks like as the snippet below:

<configuration-properties doc:name="Configuration" file="config.yaml" />

<jms:config name="JMS_Config" doc:name="JMS Config">
? <jms:active-mq-connection username="${broker.username}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? password="${broker.password}" >
? ? <jms:factory-configuration brokerUrl="${broker.url}" />
? </jms:active-mq-connection>
</jms:config>
	
<salesforce:sfdc-config name="Salesforce_Config" doc:name="Salesforce Config">
? <salesforce:basic-connection username="${salesforce.username}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?password="${salesforce.password}"
							? ?securityToken="${salesforce.securityToken}" />
</salesforce:sfdc-config>
        

This application is further comprised by 2 flows:

  • Print Batch Ready (Salesforce Event Subscriber)
  • Print Batch/Label Printer Status Changed (Salesforce Event Publisher)

Print Batch Ready (Salesforce Event Subscriber)

This flow simply subscribes to Salesforce Platform Events "Print Batch Ready to Print" and upon receipt of a new event, extracts the JSON content from payload (Print_Batch_Labels_JSON__c) and publishes a new message to Print (IN) Queue as per the screenshot and implementation below:

No alt text provided for this image
<flow name="print-batch-process-apiFlow"> 
	<salesforce:subscribe-channel-listener?
		doc:name="Print Batch Ready to Print"
		streamingChannel="/event/Print_Batch_Ready_to_Print__e"
		config-ref="Salesforce_Config"/>
	<jms:publish
		doc:name="To Print (IN) Queue"
		config-ref="JMS_Config"
		destination="${broker.queue.inbound}">
		<jms:message >
			<jms:body >
				<![CDATA[#[output application/json
---
read(payload[0].payload.Print_Batch_Labels_JSON__c, "application/json")]]]>
			</jms:body>
		</jms:message>
	</jms:publish>
</flow>        

Print Batch/Label Printer Status Changed (Salesforce Event Publisher)

In contrast to the first flow, this flow listens to messages in Print (OUT) Queue containing Print Batch or Print Batch Label changes coming out of the printers and upon receipt of a new message in this queue, this flow parses the COBOL Copybook contents and transforms to a JSON format before publishing a new Salesforce Platform Event "Print Batch/Label Status Changed" as demonstrated by the screenshot and code snippets below:

No alt text provided for this image
 __________________________
/                          \
| MULESOFT FLOW CODE (XML) |
\__________________________/

<flow name="new-print-batch-subscriber-flow">
	<jms:listener doc:name="In Print (OUT) Queue"
	? ? ? ? ? ? ? config-ref="JMS_Config"
				? destination="${broker.queue.outbound}">
		<jms:consumer-type >
			<jms:queue-consumer />
		</jms:consumer-type>
	</jms:listener>
	<set-payload value="#[payload]"
	? ? ? ? ? ? ?doc:name="Set Payload"
	? ? ? ? ? ? ?mimeType="application/flatfile; schemapath=batch-print.ffd" />
	<ee:transform doc:name="Transform Message" >
		<ee:message >
			<ee:set-payload resource="dw/COBOLCopybook_to_SFEvent.dwl" />
		</ee:message>
	</ee:transform>
	<salesforce:publish-platform-event-message platformEventName="Print_Batch_Label_Status_Changed__e"?
	? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?doc:name="Publish platform event message"
											? ?config-ref="Salesforce_Config" />
	
</flow>

 _______________________________________________
/                                               \
| DATAWEAVE CODE (COBOLCopybook_to_SFEvent.dwl) |
\_______________________________________________/

%dw 2.0
output application/java
var eventPayload = {
	rowType: payload[0]."batch-row-type",
	batchId: payload[0]."batch-id",
	batchNumber: payload[0]."batch-number",
	batchCustomerId: payload[0]."batch-customer-id",
	batchSize: payload[0]."batch-size",
	batchDateTime: (payload[0]."batch-date" default "? ? ? ? ? ") ++ " " ++ (payload[0]."batch-time" default "? ? ? ? "),
	batchStatus: payload[0]."batch-status",
	batchLabelId: payload[0]."batch-label-id",
	batchLabelNumber: payload[0]."batch-label-number",
	batchLabelProductId: payload[0]."batch-product-id",
	batchLabelQuantity: payload[0]."batch-label-quantity",
	batchLabelDateTime: (payload[0]."batch-label-date" default "? ? ? ? ? ") ++ " " ++ (payload[0]."batch-label-time" default "? ? ? ? "),
	batchLabelStatus: payload[0]."batch-label-status"
}
---
[{
	Print_Batch_Label_JSON__c: write(eventPayload, "application/json")
}]
        

And first part of the MuleSoft implementation is done. We can now work on the counterpart, which I have purposely left to the end of this article, where we in fact deal with low-level integration talking to the Printers via TCP Sockets.

Printer I/O Process API

Similar to the API that deals with Salesforce, in this one we start by creating our configuration properties in a "config.yaml" file that looks like the snippet below:

tcp
? inboundHost: "0.0.0.0"
? inboundPort: "51950"
??
broker:
? host: "127.0.0.1"
? url: "tcp://eduardo-ponzoni-datacom-pc:61616"
? port: "5672"
? username: "guest"
? password: "guest"
? queue:
? ? inbound: "Print_IN_Queue"
? ? outbound: "Print_OUT_Queue"
? ??
printer1:
? tcp:
? ? outboundHost: "127.0.0.1"
? ? outboundPort: "51951"
? ??
printer2:
? tcp:
? ? outboundHost: "127.0.0.1"
? ? outboundPort: "51952"
? ??
printer3:
? tcp:
? ? outboundHost: "127.0.0.1"
? ? outboundPort: "51953"        

As seen in these configuration properties, we have corresponding host and port information for 3 printers. In our case this is because as mentioned in the first 2 parts, we have to balance the load in between them applying a very simplistic round-robin logic. Each pair of host and port correspond to one of the 3 printers.

Next we create corresponding Global Configuration Elements in a separate "connectors.xml" configuration file as per screenshot and code shown next:

No alt text provided for this image
<os:object-store name="Object_store" doc:name="Object store">

<configuration-properties file="config.yaml" />

<sockets:request-config name="Printers_Sockets_Request_config" 
                        doc:name="Sockets Request config">
<sockets:tcp-requester-connection
      host="#[vars.printerConfiguration.outboundHost]"
      port="#[vars.printerConfiguration.outboundPort]" />
</sockets:request-config>
    
<jms:config name="JMS_Config" doc:name="JMS Config">
    <jms:active-mq-connection username="${broker.username}" 
                              password="${broker.password}"? >
        <jms:factory-configuration brokerUrl="${broker.url}" />
    </jms:active-mq-connection>
</jms:config>
  
   
<sockets:listener-config name="TCP_Sockets_Listener_config"
                         doc:name="Sockets Listener config" >
    <sockets:tcp-listener-connection host="${tcp.inboundHost}" 
                                     port="${tcp.inboundPort}"
                                     keepAlive="true">
        <reconnection>
    	    <reconnect-forever />
    	</reconnection>
    	<sockets:protocol>
    	    <sockets:direct-protocol rethrowExceptionOnRead="true" />
    	</sockets:protocol>
    </sockets:tcp-listener-connection>
</sockets:listener-config>>        

Now that the connectors and corresponding configuration is in place, we can move on to the actual implementation. This application is comprised by 2 flows:

  • New Print Batch Sender Flow
  • Printer Batch/Label Status Changed Listener

New Print Batch Sender Flow

We start by working on the first flow, which is responsible for subscribing to messages in Print (IN) Queue and upon receipt of a new message (new print batch ready), determines what printer should be responsible for printing and then transforms the payload from Salesforce Platform Event JSON to COBOL Copybook format before establishing a TCP Socket connection and transmitting the data to the printer. The below screenshot and snippet illustrate how that flow works:

No alt text provided for this image
 __________________________
/                          \
| MULESOFT FLOW CODE (XML) |
\__________________________/


<flow name="new-print-batch-sender-flow"> 
	<jms:listener doc:name="In Print (IN) Queue"?
	? ? ? ? ? ? ? config-ref="JMS_Config" ackMode="MANUAL"
				? inboundContentType="application/json"
				? destination="${broker.queue.inbound}">
		<jms:consumer-type >
			<jms:queue-consumer />
		</jms:consumer-type>
	</jms:listener>
	<set-variable value="#[attributes.ackId]" doc:name="ackId" variableName="ackId"/>
	<os:retrieve doc:name="Last Printer Id" key="lastPrinterId" objectStore="Object_store" target="lastPrinterId">
		<os:default-value ><![CDATA[#[0]]]></os:default-value>
	</os:retrieve>
	<ee:transform doc:name="Determine Printer to Execute Print Batch Job">
		<ee:message >
		</ee:message>
		<ee:variables >
			<ee:set-variable variableName="printerConfiguration" ><![CDATA[%dw 2.0
output application/java
var printer = "printer" ++ (if (vars.lastPrinterId >= 3) 1 else (vars.lastPrinterId as Number + 1))
---
{
outboundHost: p(printer ++ ".tcp.outboundHost"),
outboundPort: p(printer ++ ".tcp.outboundPort")
}]]></ee:set-variable>
		</ee:variables>
	</ee:transform>
	<ee:transform doc:name="Salesforce Platform Event to COBOL Copybook">
		<ee:message>
			<ee:set-payload resource="dw/SFEvent_to_COBOLCopybook.dwl" />
		</ee:message>
	</ee:transform>
	<sockets:send doc:name="Print Batch Message to Printer" config-ref="Printers_Sockets_Request_config" />
	<os:store doc:name="Last Printer Id" key="lastPrinterId" objectStore="Object_store">
		<os:value ><![CDATA[#[((vars.lastPrinterId as Number) + 1)]]]></os:value>
	</os:store>
	<jms:ack doc:name="Ack" ackId="#[vars.ackId]"/>
</flow>

 _______________________________________________
/                                               \
| DATAWEAVE CODE (SFEvent_to_COBOLCopybook.dwl) |
\_______________________________________________/


%dw 2.0
output application/flatfile schemaPath="batch-print.ffd", segmentIdent="batch-print"

fun getDateAsDDMMYYYY(dateTimeValue) = (
	dw::core::Strings::substring(dateTimeValue, 8, 10) ++ "/" ++
	dw::core::Strings::substring(dateTimeValue, 5, 7) ++ "/" ++
	dw::core::Strings::substring(dateTimeValue, 0, 4)
)

fun getTimeAsHHMMSS(dateTimeValue) = (
	dw::core::Strings::substring(dateTimeValue, 11, 19)
)

fun transformToCopybook(value) = (
	value map ((item, index) -> {
		"batch-row-type": "BATCH_START",
		"batch-id": item.Print_Batch__r.Id,
		"batch-number": item.Print_Batch__r.Number__c default "1",
		"batch-customer-id": item.Print_Batch__r.Account__r.Id,
		"batch-size": item.Print_Batch__r.Quantity__c default "1",
		"batch-date": getDateAsDDMMYYYY(item.Print_Batch__r.CreatedDate),
		"batch-time": getTimeAsHHMMSS(item.Print_Batch__r.CreatedDate),
		"batch-status": item.Status__c default "READY",
		"batch-label-id": item.Id,
		"batch-label-number": item.Number__c default "1",
		"batch-product-id": item.Product__r.Id,
		"batch-label-quantity": item.Quantity__c default "1",
		"batch-label-date": getDateAsDDMMYYYY(item.CreatedDate),
		"batch-label-time": getTimeAsHHMMSS(item.CreatedDate),
		"batch-label-status": item.Status__c default "READY"
	})
)
---
if (payload is Array) (transformToCopybook(payload)) else transformToCopybook([] ++ [ payload ])
        

Printer Batch/Label Status Changed Listener Flow

Finishing our solution implementation, this "listener" flow is responsible for starting a TCP Socket server waiting for COBOL Copybook messages from either of the printers containing status updates of batch or individual labels. Upon receipt of a message in this TCP channel, the flow simply publishes the message as received to the Print (OUT) queue for further processing as seen before in the Salesforce Event publisher flow. Yet again, this very simple and straight forward flow is represented by the below screenshot and code snipped:

No alt text provided for this image
<flow name="printer-io-status-changed-listener-flow">
  <sockets:listener doc:name="TCP server listener"
                    outputMimeType="text/flatfile"
                    config-ref="TCP_Sockets_Listener_config"/>
  <jms:publish doc:name="Message to Print (OUT) Queue"
               config-ref="JMS_Config"
               destination="${broker.queue.outbound}">
    <jms:message outboundContentType='application/flatfile' />
  </jms:publish>	
</flow>        

And we are done! This concludes the implementation part and this series of 3 articles aiming at recaptulating and demonstrating some of the capabilities that MuleSoft offers out of the box, sometimes neglected or not used, but that certainly are still very much needed as seen.

For seeing all of that in action without having the actual printers, I've put together a little Spring Boot application that "plays the role" of the printer for demonstration purposes, which can be downloaded from my GitHub. Otherwise, you can have a little sneak-peek yourself based on the screenshots below from both Salesforce and the little printer app.

No alt text provided for this image
No alt text provided for this image
No alt text provided for this image
No alt text provided for this image
No alt text provided for this image

Awesome! And that's it folks! I hope you have enjoyed and that this can actually benefit you and your company in case you need to face similar challenge and have the need to go back to basics and work with low-lever integrations. Go on, have fun and take your Mule for a ride. Going either high-level or low-level I will leave to your particular needs and capable hands.

Happy integrations!

要查看或添加评论,请登录

Eduardo Ponzoni的更多文章

  • Co-Existence: Key Pillar to Unleash Success in Digital Transformation

    Co-Existence: Key Pillar to Unleash Success in Digital Transformation

    A recent research conducted by IDC, has revealed that between 2020 and 2023, 54% of companies worldwide are going…

  • Using Boomi to Rapidly Improve Customer Experience in Hospitality

    Using Boomi to Rapidly Improve Customer Experience in Hospitality

    Despite the pandemic and economic crisis from recent times, according to travel research institutes, approximately 445…

    4 条评论
  • Seamless ITSM using MuleSoft

    Seamless ITSM using MuleSoft

    Introduction Enterprises from all over the world are spending all their efforts and investing in improving customer…

    6 条评论
  • Low-Level Integrations using MuleSoft - Part 2

    Low-Level Integrations using MuleSoft - Part 2

    During the first part of this article about low-level integrations using MuleSoft, a scenario was presented where a…

    2 条评论
  • Low-Level Integrations using MuleSoft - Part 1

    Low-Level Integrations using MuleSoft - Part 1

    In the modern days that we live in it's almost impossible to think of any type of communication between disparate…

    5 条评论
  • To be or not to be MuleSoft certified

    To be or not to be MuleSoft certified

    “To be or not to be” MuleSoft Certified - That is the question that very often developers and architects ask among…

    3 条评论
  • Unwired API-Led Connectivity

    Unwired API-Led Connectivity

    API-Led Connectivity MuleSoft’s API-led Connectivity is a methodical way to connect and unlock data from systems and…

    1 条评论
  • Test Pyramids in MUnit

    Test Pyramids in MUnit

    Intro During the MuleSoft Summit 2019 in Auckland, New Zealand I have had the absolute pleasure of presenting to the…

    6 条评论

社区洞察

其他会员也浏览了