A useful compromise between Orchestration and Choreography patterns with Fat Events, Recipe, and smart routing.
Today, microservices architecture is one of the most widespread architectural principles, and it is being developed and improved permanently.
Interaction and coordination between microservices are crucial tasks in building a workflow composed by the microservices. The usual list of requirements for a workflow of microservices contains any combination of topics as
Microservices architecture concurrently requires a SOLID model in which an object is a service and implements distributed logic, but operations can be durable and/or transactional. Consequently, a logical control mechanism must be used to arrange the work of microservices. On the other hand, a distributed logical model composed of microservices must be extendable and decoupled maximally. There are already elements of contradictions.
The modern approach to microservices architecture has 2 main paradigms:
Orchestration pattern.
The orchestration pattern, as it follows from the name "orchestra", is modeled by a real orchestra where a leader drives order in music band play.
In the world of microservices, it is the model, where a controller encapsulates the business logic of a logical node (subdomain) and calls services that execute the logic's operations. Normally the controller gets results from the involved services and handles errors, returning a valuable result or error notification.
The model is used mainly when our business logic contains a chain of operations, where the returned result is important and the rest of the operations depend on the result or the error.
In addition, the "orchestration" model frequently performs transactional operations, so attending to the services' response or error is an essential requirement inherent to orchestration.
This model of controlling a set of subdomain operations requires bidirectional communication where any response is handled.
However, the services collaboration model can be "one-directional," meaning that the result does not affect the rest of the chain.
Advantages of the Orchestration method.
The downsides of the Orchestration method.
The Choreography pattern.
The choreography patterns, as follows from the word "Choreography" is modeled by ballet or collective dance performance, where an actor:
As in a choreography, each service is triggered by any message/event/request from another one. Depending on the type of operation, each of the sequencing services can return either a result, an error, or just an acknowledgment. Or return nothing, relying on a communication channel (such as Kafka message) that signalizes message delivery.
The communication channels can be REST, gRPC, Kafka, RabbitMQ, or other messaging. They can be synchronous or asynchronous and can be either unidirectional or bidirectional.
Considering that the Choreography pattern is event/message based mainly(although it can be REST/gRPC), bidirectional message communication can be realized like this.
A service sends a message to a topic/queue/exchange, which is read by one of several instances of an attended service. A service's instance has a dedicated topic/queue, that gets the incoming message.
The same construction can be used on the Orchestration pattern also, but it is inherent for choreography.
In a nutshell: Domain Logic is distributed among services of the domain and is realized by the sequence of triggering mainly.
Advantages of Choreography.
This is a nice construction, especially if we want to reduce coupling, maximally reuse microservices, and consider Open Architecture options looking forward to extending the system.
However, there are serious challenges.
Downsides of choreography pattern:
Problems and possible solution ideas.
1+2+3 could make the Orchestrator indifferent to message/event contracts of involved services and just executing simple operations like a chef reads recipes of cooked food.
In this case "Orchestrator" loses its value as a real orchestrator, when the Choreography pattern (reaction to the event) continues working.
On the other hand, each of the business logic services should just handle incoming messages in accordance with any contract, which is blind to the business payload of the message and sends the response back to routing This aspect is inherent to the Choreography.
The FAT Events pattern could be used to compromise between the Choreography and Orchestration, take their advantages, and reduce downsides significantly(for a broad spectrum of cases).
The Fat Events pattern is not a new thing and is used for decoupling components of the Domain Logic in the frames of the DDD concept.
In a nutshell, a fat event transports data, which are not changed along the execution of the corresponding command in the invoked service.
Our concept of the FAT event usage:
We would extend the description of The Fat Events by assuming that a fat event contains data, that is transparent for the service and carries information, that is managed by the replacer of Orchestrator.
Another aspect of this principle is to rid a (let us say) orchestrator service of keeping the state of the distributed workflow (and take care about the state's persistence for durability and transactions).
Using commands and events for services triggering and data back notification.
Command ID and command data of a FAT Message/Event are used for the internal execution in service. The transparent workflow state is updated (!) and used in an orchestrator.
But it is not enough for our goal: while the Orchestrator keeps its state in the Command Data information, it has its business logic still.
What do we want yet, what is the idea?
Oh, just a small thing - formalize a distributed workflow definition, which will be a workflow recipe, and include:
We need just to define the metadata of the RECIPE, which describes a workflow of service calls, where each of the calls is a stage.
The RECIPE should contain its stages description, including which parameters should be taken from the flow state and inserted into parameters of the command/event of a service call and identifier of the commands. It should be used by the replacer of the orchestration mechanism to create and update the state of workflow while managing service call.
The flow state, thus, should keep the identifier of related flow (to find its recipe), the URI of a client, and a map of parameters that must be transmitted as input and output of service calls.
The state data can be transmitted as a transparent part of an event /command of call, or kept in an external storage.
Defining a workflow recipe.
Let us define the recipe (StageRecipe) that describes a stage of work:
@Data
public class StageRecipe {
/** status of flow execution */
public enum ExecutionStatus {progress, success,rollback,failed};
/** Identifier of command of this stage */
private String commandId;
/** URI of this message */
private String serviceURI;
/** is the stage transactional */
private boolean transactional=false;
/** mapping common data map in FlowRecipe on input data of
this stage */
private Map<String,String> inputParamsMapping = new HashMap<>();
private Map<String,String> outputParamsMapping = new HashMap<>();
}
The reason for the majority of fields is clear.
The inputParamsMapping field contains a key-value map, where the key is the key in the commonParametersMap in the FlowStateData object ( that keeps workflow data along execution), and the value is the key. with that, the value transmitted to the event/command is related to the service.
outputParamsMapping has the same reason for output params.
The reason for the mapping: the key of a command parameter in the command data may be different from the key of this parameter in the map of parameters of the whole workflow. We will return back to this aspect.
The StageRecipe as mentioned defines a stage of the workflow. It is contained in the list of stages in the FlowRecipe, which describes the workflow and will be used for forming service calls, workflow management, and updating its state.
A workflow recipe defines the workflow process and will be used to build workflow state, workflow stages, and command events to be handled by services. Its content is quite transparent:
@Data
public class FlowReceipe {
/**
* identifier of the recipe to be used for flow data building
*/
String recipeId;
/** URI of the router which dispatches initial request and response */
String recipeRouterURI;
/**
List of stage definition {@link StageRecipe}
*/
private List<StageRecipe> stages = new ArrayList<>();
private Map<String,Object> inParamsMap = new HashMap<>();
private Map<String,Object> outParamsMap = new HashMap<>();
}
It has an identifier recipe, which will be used by the Orchestrator/Router to build workflow state and find the related RECIPE when the state is deserialized from a transparent field of the call, that is being handled by the orchestrator.
The second important parameter receipeRouterURI - URI of Orchestrator/Router, where results will be sent to.
The FlowRecipe is used to create the FlowStateData instance to keep the state of the current workflow instance. This is to be performed by the service, which replaces the Orchestrator.
@Data
public class FlowStateData {
/** identifier of recipe for router */
private String recipeId
/** URI of client which sent source request */
private String clientURI;
/** uuid of the current flow */
private String uuid = UUID.randomUUID().toString();
/** Error message */
private String errorMesage;
/** status of the execution */
private StageRecipe.ExecutionStatus executionStatus;
/** flow data map, which contains input, output and intermediate data, which are referred by stage recipe */
private Map<String,Object> commonFlowData = new HashMap<>();
/** Map of transaction data to be used on rallback */
private Map<String,Map<String,Object>> transactionalData = new HashMap<>();
/** order of stage */
private int stage;
}
This state has a field?recipeId that links the id of the flow to the recipe, which will be used to create the state data of Flow, generate events/commands to services, and accept results.
Other crucial fields of the FlowStateData:
However, the "Orchestrator" now has no specific business logic. It just performs operations with data fields of RECIPE and states those are common for any business logic.
Thus, the "Orchestrator" has no sense as a business subdomain component, and becomes to be just router, smart outer, however. It should not be related to a domain logic and become a part of the infrastructure layer.
The router is skilled in obtaining FlowRecipe schemas, creating a FlowRecipeData, which is the state of the projected distributed workflow, creating the StageExecutionCommand to call a service, and processing the StageExecutedEvent.
Orchestrator becomes a Smart Router, so lets call it a SmartRouter .
It will be dislayed in all diagrams as
By the concept, the FlowStateData is transported transparently along the workflow executions.
The FlowRecipeData is created, managed, and used by the SmartRouter.
The SmartRouter creates and routes command events to the services gets results, and manages the state of the distributed workflow.
StageExecutionCommand is built by the SmartRouter by following the recipe for this stage, which is contained in the FlowRecipe. The index of the current stage kept in the FlowStateData#stage field.
The FlowStateData is used ONLY INTERNALLY in the SmartRouter. Its serialized form is set as a blob into the StageExecutionCommand event data, transferred transparently, and returned by services.
The StageExecutionCommand is created by the SmartRouter using the FlowStateRecipe instance and the StageRecipe, that corresponds to the current stage.
The StageExecutionCommand has the following crucial fields:
Upon command has been executed by the attended service, the StageExecutedEvent is sent to the SmartRouter, which has the following crucial fields:
The?SmartRouter builds the StateExecutionCommand#parameters by getting parameters from the FlowStateData#commonFlowData, using the mapping rules from the StageRecipe#inputParamsMap key mapping.
A remark about parameters mapping between FlowStateData#
commonFlowData and the StageExecutionCommand#parameters.
The above-described mapping procedure might be very applicable because the names of the keys in the FlowStateData#commonFlowData might be complicated (such as stage2.accountID), while the StateExecutionCommand#parameters could be mapped to a data class of business logic of a service. Besides the naming convention of FlowStateData#commonFlowData, which is an internal object of the SmartRouter (which does not belong to a business domain), should not be coupled with the parameter names of a concrete service: the service's call parameter names should have a business reason.
The StageExecutionCommand is built in the SmartRouter, involving "objects for relational use only" - FlowStateData and StageRecipe. But only the StageRecipe is kept permanently (in the FlowRecipe).
The execution result is ready, and the event - StageExecutedEvent is built on a target Service's behalf.
As we see, the FlowStateData is transported transparently through the call and will be used by the SmartRouter while processing the StageExecutedEvent, that has beensent by a service of the workflow.
The code snippet below demonstrates a simple example of the StageExecutionCommand building in the SmartRouter using the instance of FlowStateData and FlowRecipe:
private static StageExecutionCommand nextRequestInProgress(FlowStateData data, FlowRecipe recipe) {
List<StageRecipe> stages = recipe.getStages();
if (stages.size() <= data.getStage()) {
return null;
}
int stageId = data.next().getStage();
StageRecipe nextRecipe = stages.get(stageId);
byte[] serialized = serialize(data, getExternalInfo());
StageExecutionCommand result = new StageExecutionCommand(serialized, nextRecipe.getCommandId(), data.getUuid());
result.setReason(data.getExecutionStatus());
result.setOperation(result.getOperation());
result.setTargetUrI(nextRecipe.getServiceURI());
Map<String, Object> nextData = data.getCommonFlowData();
Map<String, Object> actualData = result.getParameters();
nextRecipe.getInputParamsMapping().forEach((k, v) -> {
actualData.put(v, nextData.get(k));
});
return result;
}
This is an example of the StageExecutedEvent?handling in the SmartRouter.
/**
* handle response for and opertaion from required service
*
* @param result result of last processing request
* @param recipe - recipee of processing
* @return and instance of the RequestDTO clas, which can be either
*/
public static FlowExchangeDTO processResponse(StageExecutedEvent result, FlowStateData dataExt, FlowRecipe recipe) {
if (dataExt == null)
dataExt = fromSerialized(result);
FlowStateData data = dataExt;
///IJSONSerializeable.load(result.getBlob(), FlowStateData.class);
if (result.getErrorCode() == null && (data.getExecutionStatus().equals(success) || data.getExecutionStatus().equals(progress))) {
Map<String, Object> params = result.getParameters();
recipe.getStages().get(data.getStage()).getOutputParamsMapping().forEach((k, v) -> {
Object back = params.get(k);
if (back == null)
return;
data.getCommonFlowData().put(v, back);
});
if (data.getStage() == recipe.getStages().size() - 1) {
data.setExecutionStatus(success);
Map<String, Object> map = new HashMap<>();
recipe.getOutParamsMap().forEach((k, v) -> {
map.put(v, data.getCommonFlowData().get(k));
});
FinalResponse response = new FinalResponse(result, data.getClientURI(), map);
response.setOperation(recipe.getRecipeId());
return response;
}
data.getTransactionalData().put(result.getOperation(), result.getTransactionData());
return generateRequest(data, recipe);
}
if (data.getExecutionStatus().equals(rallback)) {
if (data.getStage() == 0) {
return result;
}
}
data.setExecutionStatus(failed);
int currStage = data.getStage();
for (int i = currStage; i <= 0; i--) {
StageRecipe sr = recipe.getStages().get(i);
if (sr.isTransactional()) {
data.setStage(i);
data.setExecutionStatus(rallback);
return generateRequest(data, recipe);
}
}
return new FinalResponse(result, data.getClientURI(), null);
}
The handling code performs very simple operations:
So SmartRouter does not even need to keep a state of the distributed workflow. Furthermore, it knows nothing about what is done in the distributed workflow, that is exactly what we want.
As we see, each next successful step increments the stage by 1, and, if there are yet stages to be executed in the distributed workflow, it gets related StageRecipe and builds StageExecutionCommand.
If there is no rest of the steps, then the result returns to the initiator of the workflow instance.
If a command execution is unsuccessful, the stage is decremented, the status is set to rollback, and a new, rollback command will be generated to rollback the distributed flow.
The SmartRouter uses the StageExecutionCommand#targetURI parameter as the destination URI, to send the data to.The parameter is copied from the StageRecipe#serviceURI.
Assuming, that the SmartRouter and Services communicate via a Message Queue (Kafka for example), let us see the whole picture.
As we see, there are 2 separate data flows in the interactions while progressing the distributed flow.
The business logic of any Service does not know anything either about the business logic of other Services and current workflow, or where the result will be directed to, and which service will be triggered next.
The SmartRouter, as mentioned, knows nothing about which service will be called, and the contract of the command message.
Full Workflow Sequence.
Update the SmartRouter service with FlowRecipe for workflows expected
Any Configuration Service sends the FlowRecipe list to SmartRouter, which puts it into the RecipeStorage of the SmartRouter.
Triggering workflow in SmartRouter:
A client Service sends an execution request FlowTriggerEvent to the SmartRouter, which retrieves related FlowRecipe from the Recipe Storage
The SmartRouter uses the extracted FlowRecipe?to create the FlowStateData, which will be used to generate FlowExecutionCommand for stages along the workflow progress.
A sample code snip is:
/**
* The trigFlow method processes incoming the incoming flowTriggerEvent triggering event {@link FlowTriggerEvent}
* retrieve related {@link FlowReceipe} from storage map and returns the first {@link StageExecutionCommand}
* @param recipeMap - map of recipies by ID
* @param flowTriggerEvent - instance of the {@link FlowTriggerEvent} which contains data are necessary to create worklfow
* @return - {@link StageExecutionCommand} command to trig the first stage of the workflow
*/
public StageExecutionCommand trigFlow(Map<String,FlowReceipe> recipeMap,FlowTriggerEvent flowTriggerEvent){
FlowReceipe recipe= recipeMap.get(flowTriggerEvent.getOperation());
if(recipe==null)
return null;
FlowStateData data= new FlowStateData();
data.setClientURI(flowTriggerEvent.getClientUri());
Map<String,Object> mapData = new HashMap<>();
data.setUuid(flowTriggerEvent.getUuid());
data.setCommonFlowData(mapData);
recipe.getInParamsMap().forEach((k,v)->{
mapData.put(v,flowTriggerEvent.getParameters().get(k));
});
return generateRequest(data,recipe);
}
The addressed Service processes the command.
The Service, which accepts the generated StageExecutionCommand , executes it and forms the StageExecutionCommand. The StageExecutedEvent obtains the blob from the StageExecutionCommand and then is returned to the SmartRouter
Now SmartRouter accepts and handles the StageExecutedEvent and updates StageExecutedEvent.
The incoming StageExecutionEvent brings return parameters and the blob, containing the serialized FlowStateData.
The FlowStateData always contains the identifier of the related FlowRecipe. SmartRouter gets the return data from the event and maps it onto FlowStateData#commonFlowData, computes the next stage in the FlowStateData#stage, retrieves the proper StageRecipe, creates the new StageExecutionCommand and maps FlowStateData#commonFlowData?onto the StageExecutionCommand#parameters, and sends the command to the SmartRouter:
The whole process scheme:
BENEFITS
Downsides:
An example of the command-data contract implementation is a simple API.
Exposed interface for a service creation
public interface ICommandHandlingHub {
/**
* creates instance of a command executor, runs business logic and gets result
* @param command - source command
* @param inType - input type
* @param executor - executor function
* @param transactionCompensator - compensator which should be called when status is {@link StageRecipe.ExecutionStatus#rallback}
* @return instance of the class
* @param <IN>
* @param <OUT>
*/
<IN,OUT> void addCommandHandling(String command,Class<IN> inType, BiFunction<IN, Map<String,Object>,OUT> executor, Consumer<Map<String,Object>> transactionCompensator);
/**
* executies buisness logic anf forms result as event
* @param command - command to be executed
* @return - The Mono form of reactor. which accepts the {@link StageExecutionCommand}
*/
Mono<StageExecutedEvent> execute(StageExecutionCommand command);
}
It exposes methods for
Implementation is quite straightforward.
public enum CommandHandlingHub implements ICommandHandlingHub
{
inventory{
/** map of required handlers per command */
Map<String, Triple<Class,BiFunction,Consumer>> mapper=new HashMap<>();
@Override
public <IN, OUT> void addCommandHandling(String command, Class<IN> inType, BiFunction<IN, Map<String, Object>, OUT> executor, Consumer<Map<String, Object>> compensator) {
mapper.put(command, new Triple<Class, BiFunction, Consumer>() {
@Override
public Class getLeft() {
return inType;
}
@Override
public BiFunction getMiddle() {
return executor;
}
@Override
public Consumer getRight() {
return compensator;
}
});
}
@Override
public Mono<StageExecutedEvent> execute(StageExecutionCommand command) {
Triple<Class,BiFunction,Consumer> triple= mapper.get(command.getOperation());
return StageExecutionProcedure.prepareCommandExecution(command,triple.getLeft(), triple.getMiddle(),triple.getRight()).andExecute();
}
}
}
The execution procedure build is also straightforward
/**
*
* @param <IN> input params type
* @param <OUT> output type
*/
public class StageExecutionProcedure<IN,OUT> {
/** JSON mapper */
private static final ObjectMapper theMapper = new ObjectMapper();
/** type to convert to map */
private static final TypeReference<Map<String,Object>> mapType= new TypeReference<Map<String,Object>>() {};
public static final String UNEXPECTED_STATUS = "unexpected status";
/** event to be updated by execution result and returned */
StageExecutedEvent event = new StageExecutedEvent();
/** processing function, which gets input paramenter, in-out map to accepts transactional data,
* returns processing values */
private BiFunction<IN,Map<String,Object>,OUT> procedure;
/** compensation consumer, which will be called when rallback is the status*/
private Consumer<Map<String, Object>> compensator;
/** source command */
private StageExecutionCommand command;
/** input type */
private Class<IN> inType;
/**
* creates instance of a command executor
* @param command - source command
* @param inType - input type
* @param executor - executor function
* @param transactionCompensator - compensator which should be called when status is {@link StageRecipe.ExecutionStatus#rallback}
* @return instance of the class
* @param <IN>
* @param <OUT>
*/
public static <IN,OUT> StageExecutionProcedure<IN,OUT> prepareCommandExecution(StageExecutionCommand command, Class<IN> inType, BiFunction<IN,Map<String,Object>,OUT> executor, Consumer<Map<String,Object>> transactionCompensator){
StageExecutionProcedure<IN,OUT> executionProcedure = new StageExecutionProcedure<>();
executionProcedure.event.setOperation(command.getOperation());
executionProcedure.event.setUuid(command.getUuid());
executionProcedure.command= command;
executionProcedure.procedure= executor;
executionProcedure.inType= inType;
executionProcedure.compensator= transactionCompensator;
return executionProcedure;
}
/**
* execites the prepared execution procedures
* @return - reactor on StageExecutedEvent ready in the Mono <> form.
*/
Mono<StageExecutedEvent> andExecute(){
Mono<StageExecutedEvent> res= Mono.create((ms)->{
String[] errorMessage= new String[1];
Map<String,Object> out= null;
Map<String,Object> rallbackParams = new HashMap<>();
switch(command.getReason()){
case progress -> {
out=tryExecute(rallbackParams,errorMessage);
}
case rallback->{
rallback(command.getParameters(),errorMessage);
}
default ->{
errorMessage[0]= UNEXPECTED_STATUS;
}
}
event.setErrorCode(errorMessage[0]);
event.setTransactionData(rallbackParams);
event.setBlob(command.getBlob());
if(out!=null)
event.getParameters().putAll(out);
ms.success(event);
});
return res;
}
/**
* executes service's business logic, as a BiFunction
* @param rallbackParams - map which accepts parameters of transaction if required
* @param errorMessage - reference on the error message.
* @return - output parameters in the form of map
*/
private Map<String,Object> tryExecute(Map<String,Object> rallbackParams, String [] errorMessage){
try {
IN in = theMapper.convertValue(command.getParameters(),inType);
return theMapper.convertValue(procedure.apply(in,rallbackParams),mapType);
}
catch(Throwable t){
errorMessage[0]= t.getMessage();
return null;
}
}
private void rallback(Map<String,Object> rallbackParams, String [] errorMessage){
try {
compensator.accept(rallbackParams);
}
catch(Throwable t){
errorMessage[0]= t.getMessage();
}
}
}
Fighting a potentially large size of state data.
As mentioned, the data state might be large, and transparent transportation of it with?the StageExecutionCommand#blob?and the?StageExecutedEvent#blob?could be problematic for Kafka, for example.
Remember, that data Kafka message is transferred and redirected inside brokers over TCP/IP protocols, where the atomic message side is about 1 kbyte. While Kafka is a throughput champ, the growth of message size leads to performance degradation. The FlowStateData could be large, mainly, because the size of the FlowStateData#commonFlowData can grow, due to the size of values kept. Consequently, the blob field will grow, together with the command and event message.
What is the solution? The first and easiest way is to use external storage to keep a portion of data of a large size. This way is used in Kafka when large messages are transferred as references to the URI of external storage such as S3, EFS, Redis, and so on.
Considering our goals, we will use a reference to the Redis in the StageExecutionCommand#blob, StageExecutedEvent#blob.
public class DataURI {
/** the marker prefixes seralized data of this class */
static public String MARKER = ":data_uri_marker->";
/** identifier of recipe */
String recipeID;
/** A uri in a concrete external data mechanism (RMap name for instance)*/
String uri;
/** entry in the URI */
String entry;
}
This class points to the location of real data and it will be serialized/ deserialized in the blob field (and deserialized consequently).
An instance of the class is created and bound to a concrete FlowRecipe instance using the recipeId parameters.
Now, suppose that we defined the interface:
public interface IExternalStateInfo {
/** adds info */
void addInfo(DataURI data);
/** get info by identifier of recipe */
DataURI get(String id);
}
which accepts DataURI?instances and maps them using the DataURI#recipeID as the key.
Like this
static private IExternalStateInfo stateInfo = new IExternalStateInfo() {
Map<String,DataURI> infoMap = new HashMap<>();
@Override
public void addInfo(DataURI info) {
infoMap.put(info.recipeID,info);
}
@Override
public DataURI get(String id) {
return infoMap.get(id);
}
};
The DataURI information is also sent by the configuration service, accepted, mapped, and accepted by the SmartRouter service.
The service will keep the FlowStateData in the external storage if a DataURI instance is mapped for the current FlowStateData#recipeId.
We need only create serializers of FlowStateData considering th
private static FlowStateData fromSerialized( StageExecutedEvent event){
String strBlob = new String(event.getBlob());
if(strBlob.startsWith(DataURI.MARKER)){
strBlob = strBlob.substring(strBlob.length());
DataURI info= IJSONSerializeable.load(strBlob, DataURI.class);
return Cache.instance().retrieve(info.uri,info.entry);
}
return IJSONSerializeable.load(strBlob,FlowStateData.class);
}
private static byte[] serialize(FlowStateData data,IExternalStateInfo info){
DataURI dataInfo=info.get(data.getRecipeId());
if(dataInfo==null)
return IJSONSerializeable.serialize(dataInfo);
Cache.instance().addObject(dataInfo.getUri(), dataInfo.getEntry(),data);
return IJSONSerializeable.serialize(DataURI.MARKER,dataInfo);
}
And use it in the internal transformations
private static StageExecutionCommand nextRequestInProgress(FlowStateData data, FlowRecipe recipe) {
List<StageRecipe> stages = recipe.getStages();
if (stages.size() <= data.getStage()) {
return null;
}
int stageId = data.next().getStage();
StageRecipe nextRecipe = stages.get(stageId);
byte[] serialized = serialize(data, getExternalInfo());
StageExecutionCommand result = new StageExecutionCommand(serialized, nextRecipe.getCommandId(), data.getUuid());
result.setReason(data.getExecutionStatus());
result.setOperation(result.getOperation());
result.setTargetUrI(nextRecipe.getServiceURI());
Map<String, Object> nextData = data.getCommonFlowData();
Map<String, Object> actualData = result.getParameters();
nextRecipe.getInputParamsMapping().forEach((k, v) -> {
actualData.put(v, nextData.get(k));
});
return result;
}
What do we win when using external data storage?
It is useable when leveraging the EventDriven model over Kafka to avoid throughput degradation.
Does this principle increase performance in general or save CPU time by reducing the size of serialization? Nope. Usage of any external storage leads to serialization by definition. So, the only winning topic in this way is to handle the message size problem of a framework.
An example of the RECIPE, FAT EVENTS, and SmartRouter realization.
Let us suppose that we have a group of microservices that sell stock shares upon request. There are shares sellers (owners) and buyers.
Any user (seller or buyer) has an account. where funds are kept, shares holder account which keeps all shares purchased.
Any seller populates shares for buyers, where a shares package can be queried by IDs of shares and cost amount.
So, there are the following services with their own databases
Money Account Service managed funds of participators.
Shares for sale Services queries shares by identifier and cost of package.
Share account services manage shares of participators.
Share purchase is realized by the following flow:
First, let us define the recipe which describes the flow, contains stages, input and output parameters mapping, and map of common storage parameters:
{
"recipeId": "buyShares",
"recipeRouterURI": "routerQ",
"stages": [
{
"commandId": "findShares",
"serviceURI": "queryQ",
"transactional": false,
"inputParamsMapping": {
"deal.shareID": "shareID",
"deal.amount": "amount"
},
"outputParamsMapping": {
"amount": "deal.amount",
"shareID": "deal.shareID",
"ownerID": "deal.ownerID"
}
},
{
"commandId": "lockFunds",
"serviceURI": "moneyAccountQ",
"transactional": false,
"inputParamsMapping": {
"deal.buyerID": "buyerID",
"deal.amount": "amount"
},
"outputParamsMapping": {
"buyerID": "deal.buyerID",
"locked": "deal.lockedFunds"
}
},
{
"commandId": "lockShares",
"serviceURI": "shareAccountQ",
"transactional": false,
"inputParamsMapping": {
"deal.ownerID": "ownerID",
"deal.amount": "amount"
},
"outputParamsMapping": {
"ownerID": "deal.ownerID",
"locked": "deal.lockedShares"
}
},
{
"commandId": "transferFunds",
"serviceURI": null,
"transactional": false,
"inputParamsMapping": {
"deal.ownerID": "ownerID",
"deal.buyerID": "buyerID",
"deal.lockedFunds": "locked",
"deal.amount": "amount"
},
"outputParamsMapping": {
"locked": "deal.lockedFunds"
}
},
{
"commandId": "transferShares",
"serviceURI": null,
"transactional": false,
"inputParamsMapping": {
"deal.ownerID": "ownerID",
"deal.buyerID": "buyerID",
"deal.amount": "amount"
},
"outputParamsMapping": {
"locked": "deal.lockedShare"
}
}
],
"inParamsMap": {
"shares": "deal.shareID",
"clientID": "deal.buyerID",
"sum": "deal.amount"
},
"outParamsMap": {
"deal.shareID": "shares",
"deal.ownerID": "from",
"deal.buyerID": "clientID",
"deal.amount": "sum"
}
}
The recipe defines the input and output parameters of the workflow trigger, their mapping on parameter storage, mapping common parameters on services call contracts, mapping output parameters, that are returned by services on common parameters, and mapping necessary common parameters on the final result.
Let us send the trigger to the router:
"operation": "buyShares",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"shares": "Coca-Cola_123",
"clientID": "[email protected]",
"sum": 1200000.0
},
"clientUri": "uri"
}
SmartRouter, by the definition of stage 0 returns the following command:
{
"operation": "findShares",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"shareID": "Coca-Cola_123"
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTM2MTJhNWEtOGVmYS00YTFjLWFiMTEtYzAxZGE1YTllNjE4IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjB9LCJ0cmFuc2FjdGlvbmFsRGF0YSI6e30sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjB9",
"transactionData": {},
"targetUrI": "queryQ",
"reason": "progress"
}
The Share for Sale Service has a handler of the command
CommandHandlingHub.inventory.addCommandHandling(FindSharesStageRecipe.COMMAND, FindShares.class, (in, mapTx) -> {
ShareFound found = new ShareFound();
found.setOwnerID("[email protected]");
found.setShareID(in.getShareID());
found.setAmount(in.getAmount());
return found;
}, null);
and triggers the event upon the command processed
## event
{
"operation": "findShares",
"uuid":"f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"shareID": "Coca-Cola_123",
"amount": 1200000.0,
"ownerID": "[email protected]"
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTM2MTJhNWEtOGVmYS00YTFjLWFiMTEtYzAxZGE1YTllNjE4IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjB9LCJ0cmFuc2FjdGlvbmFsRGF0YSI6e30sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjB9",
"transactionData": {},
"errorCode": null
}
The owner id of the shares is returned and it will be kept in the state data.
The SmartRouter handles the event, progresses the stage number and the next command is
{
"operation": "lockFunds",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"buyerID": "[email protected]"
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTM2MTJhNWEtOGVmYS00YTFjLWFiMTEtYzAxZGE1YTllNjE4IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjB9LCJ0cmFuc2FjdGlvbmFsRGF0YSI6eyJmaW5kU2hhcmVzIjp7fX0sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjF9",
"transactionData": {},
"targetUrI": "moneyAccountQ",
"reason": "progress"
}
The FlowStateData, which is serialized in the blob has the value
{
"recipeId": "buyShares",
"clientURI": "uri",
"errorMesage": null,
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"commonFlowData": {
"deal.shareID": "Coca-Cola_123",
"deal.buyerID": "[email protected]",
"deal.amount": 1200000.0,
"deal.ownerID": "[email protected]"
},
"transactionalData": {
"findShares": {}
},
"executionStatus": "progress",
"stage": 1
}
Now it contains the deal.ownerID value, which will be used in future operations after mapping on the related service call contract.
The "lockFunds" command is sent to the Money Account Manager service, where the handler defined:
CommandHandlingHub.inventory.addCommandHandling(LockFundsStageRecipe.COMMAND, LockFunds.class, (in, mapTx) -> {
FundsLocked locked = new FundsLocked();
locked.setBuyerID(in.getBuyerID());
locked.setLocked(in.getAmount());
return locked;
}, null);
It triggers the event to the SmartRouter upon the command handled.
"operation": "lockFunds",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"buyerID": "[email protected]",
"locked": 1200000.0
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiNGI4MTBmZmQtOTdmOS00OGFlLTlmZTUtNGM1Y2IwODgzZTgzIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIn0sInRyYW5zYWN0aW9uYWxEYXRhIjp7ImZpbmRTaGFyZXMiOnt9fSwiZXhlY3V0aW9uU3RhdHVzIjoicHJvZ3Jlc3MiLCJzdGFnZSI6MX0=",
"transactionData": {},
"errorCode": null
}
The SmartRouter handles the command, using related StageRecipe, updates the FlowStateRecipe#commonFlowData
{
"recipeId": "buyShares",
"clientURI": "uri",
"errorMesage": null,
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"commonFlowData": {
"deal.shareID": "Coca-Cola_123",
"deal.buyerID": "[email protected]",
"deal.amount": 1200000.0,
"deal.ownerID": "[email protected]",
"deal.lockedFunds": 1200000.0
},
"transactionalData": {
"findShares": {},
"lockFunds": {}
},
"executionStatus": "progress",
"stage": 2
}
, progresses to the next stage, and generates the new command:
{ "operation": "lockShares",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"ownerID": "[email protected]"
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiZDUxMDg1Y2EtNjlkMC00ZjI2LTljYTktNzMyYTgwMzJmOTM0IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e319LCJleGVjdXRpb25TdGF0dXMiOiJwcm9ncmVzcyIsInN0YWdlIjoyfQ==",
"transactionData": {},
"targetUrI": "shareAccountQ",
"reason": "progress"
}
to Shares Account Managing Service.
The Share Account Service has the following handler for this command:
CommandHandlingHub.inventory.addCommandHandling(LockShareStageRecipe.COMMAND, LockShares.class, (in, mapTx) -> {
SharesLocked locked= new SharesLocked();
locked.setLocked(in.getAmount());
locked.setOwnerID(in.getOwnerID());
return locked;
}, null);
The service triggers back the following event upon the command processed.
{
"operation": "lockShares",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"ownerID": "[email protected]",
"locked": 1200000.0
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYmQ1ZWU2ODMtNTg5NS00OTliLWJkZDUtZTQ0MzkzYjhiY2YyIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e319LCJleGVjdXRpb25TdGF0dXMiOiJwcm9ncmVzcyIsInN0YWdlIjoyfQ==",
"transactionData": {},
"errorCode": null
}
SmartRouter processes this event and updates the FlowStateData to the value
{
"recipeId": "buyShares",
"clientURI": "uri",
"errorMesage": null,
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"commonFlowData": {
"deal.shareID": "Coca-Cola_123",
"deal.buyerID": "[email protected]",
"deal.amount": 1200000.0,
"deal.ownerID": "[email protected]",
"deal.lockedFunds": 1200000.0,
"deal.lockedShares": 1200000.0
},
"transactionalData": {
"findShares": {},
"lockFunds": {},
"lockShares": {}
},
"executionStatus": "progress",
"stage": 3
}
moves to the next StageRecipe, and forms the new command:
{
"operation": "transferFunds",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"ownerID": "[email protected]",
"buyerID": "[email protected]",
"locked": 1200000.0
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYmQ1ZWU2ODMtNTg5NS00OTliLWJkZDUtZTQ0MzkzYjhiY2YyIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9fSwiZXhlY3V0aW9uU3RhdHVzIjoicHJvZ3Jlc3MiLCJzdGFnZSI6M30=",
"transactionData": {},
"targetUrI": "moneyAccountQ",
"reason": "progress"
}
The command is sent to the Money Account Manager Service, where the related handler is defined:
CommandHandlingHub.inventory.addCommandHandling(TransferFundsStageRecipe.COMMAND, TransferFunds.class, (in, mapTx) -> {
FundsTransferred transferred= new FundsTransferred();
transferred.setOwnerID(in.getOwnerID());
transferred.setBuyerID(in.getBuyerID());
transferred.setAmount(in.getAmount());
transferred.setLocked(0);
return transferred;
}, null);
The service fires the event upon the handler executed:
{
"operation": "transferFunds",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"buyerID": "[email protected]",
"ownerID": "[email protected]",
"locked": 0.0
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTQ4ZmQzZjAtOWFkNi00ODI0LThmODItZDUwZjUzNGU1NTlkIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9fSwiZXhlY3V0aW9uU3RhdHVzIjoicHJvZ3Jlc3MiLCJzdGFnZSI6M30=",
"transactionData": {},
"errorCode": null
}
The SmartRouter handles this event, progresses to the last stage, and updates the FlowStateData :
{
"recipeId": "buyShares",
"clientURI": "uri",
"errorMesage": null,
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"commonFlowData": {
"deal.shareID": "Coca-Cola_123",
"deal.buyerID": "[email protected]",
"deal.amount": 1200000.0,
"deal.ownerID": "[email protected]",
"deal.lockedFunds": 0.0,
"deal.lockedShares": 1200000.0
},
"transactionalData": {
"findShares": {},
"lockFunds": {},
"lockShares": {},
"transferFunds": {}
},
"executionStatus": "progress",
"stage": 4
}
and returns the next and the last command in our workflow (stage == 4 !):
{
"operation": "transferShares",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"ownerID": "[email protected]",
"buyerID": "[email protected]"
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiNjlhM2FlOTMtY2YyYi00MDQ0LWI4ZDQtMWVkNzk5ZDQ5ZGU0IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9LCJ0cmFuc2ZlckZ1bmRzIjp7fX0sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjR9",
"transactionData": {},
"targetUrI": "shareAccountQ",
"reason": "progress"
}
The Share Account Service has the handler for this command:
CommandHandlingHub.inventory.addCommandHandling(TransferSharesStageRecipe.COMMAND, TransferShare.class, (in, mapTx) -> {
ShareTransferred transferred = new ShareTransferred();
transferred.setAmount(in.getAmount());
transferred.setOwnerID(in.getOwnerID());
transferred.setBuyerID(in.getBuyerID());
transferred.setLocked(0);
return transferred;
}, null);
The service processes the command with this handler and fires back this event for the
{
"operation": "transferShares",
"uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
"parameters": {
"amount": 1200000.0,
"buyerID": "[email protected]",
"ownerID": "[email protected]",
"locked": 0.0
},
"blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiZjM0YjM5ZDQtN2QzMi00ZTFhLTg4MGEtYjlhMzAyZTQ2ZTRkIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9LCJ0cmFuc2ZlckZ1bmRzIjp7fX0sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjR9",
"transactionData": {},
"errorCode": null
}
The SmartRouter handles this event and, because there are no stages anymore, it returns the result to be returned to the caller:
{
"operation": "buyShares",
"uuid": "4531692c-c580-40d4-b30f-3906bfa6b090",
"parameters": {
"shares": "Coca-Cola_123",
"clientID": "[email protected]",
"from": "[email protected]",
"sum": 1200000.0
}
}
More complicated cases of RECIPE with FAT EVENTS AND ROUTING.
We considered only the simplest cases of business logic when the next stage depends on the success or failure of the current stage. These cases encompass a broad spectrum of real industrial tasks. However,, there can be more complicated situations when the next stage depends on the result of the current stage. For example, the type or status of a client's account.
Could we handle such cases? Yes, we can. We need to use rules in the Recipe. But the continuation of the theme in this article would make it too expanded. I'll try to demonstrate this in the next one.