A useful compromise between Orchestration and Choreography patterns with Fat Events, Recipe, and smart routing.

Today, microservices architecture is one of the most widespread architectural principles, and it is being developed and improved permanently.

Interaction and coordination between microservices are crucial tasks in building a workflow composed by the microservices. The usual list of requirements for a workflow of microservices contains any combination of topics as

  • call microservice
  • handle results of a microservice call
  • provide composite business logic
  • handle errors
  • perform transactions
  • provide call durability (recovery after error)
  • loose coupling
  • simple extendability
  • clear logging.

Microservices architecture concurrently requires a SOLID model in which an object is a service and implements distributed logic, but operations can be durable and/or transactional. Consequently, a logical control mechanism must be used to arrange the work of microservices. On the other hand, a distributed logical model composed of microservices must be extendable and decoupled maximally. There are already elements of contradictions.

The modern approach to microservices architecture has 2 main paradigms:

  1. Microservices orchestration
  2. Microservices choreography.

Orchestration pattern.

The orchestration pattern, as it follows from the name "orchestra", is modeled by a real orchestra where a leader drives order in music band play.


In the world of microservices, it is the model, where a controller encapsulates the business logic of a logical node (subdomain) and calls services that execute the logic's operations. Normally the controller gets results from the involved services and handles errors, returning a valuable result or error notification.

The model is used mainly when our business logic contains a chain of operations, where the returned result is important and the rest of the operations depend on the result or the error.

In addition, the "orchestration" model frequently performs transactional operations, so attending to the services' response or error is an essential requirement inherent to orchestration.

This model of controlling a set of subdomain operations requires bidirectional communication where any response is handled.

However, the services collaboration model can be "one-directional," meaning that the result does not affect the rest of the chain.

Advantages of the Orchestration method.

  1. Simple to organize workflow, which is composed using synchronous & blocking asynchronous interactions over REST, WebFlux, gRPC, and various modifications of RMI.
  2. Simple to organize transactions: a service, that implements a subdomain logic and encapsulates a chain of operations and transactional logic.
  3. Simple operations logging because of the same cause as in the 1 and 2.
  4. Very smooth migration from the monolith and SOA manner of thinking to microservices architecture.

The downsides of the Orchestration method.

  1. Logical coupling. All the subdomain business logic is encapsulated in one service and any extension requires changes, and redelivery of this service per any change in either the logic or interface of a service or its behavior. This aspect is coupling the development of composing services, controller service, related development, deployment, and versioning. Especially this occurs when a business logic, which is managed by the controller, is composed of a significant amount of services. This is exactly the opposite reason for the microservices architecture.
  2. Single point of failure. Logical or programmatic failure of the controller leads to failure of the subdomain.

The Choreography pattern.

The choreography patterns, as follows from the word "Choreography" is modeled by ballet or collective dance performance, where an actor:

  • acts upon a fragment of melody
  • acts upon a gest from another actor.

As in a choreography, each service is triggered by any message/event/request from another one. Depending on the type of operation, each of the sequencing services can return either a result, an error, or just an acknowledgment. Or return nothing, relying on a communication channel (such as Kafka message) that signalizes message delivery.

The communication channels can be REST, gRPC, Kafka, RabbitMQ, or other messaging. They can be synchronous or asynchronous and can be either unidirectional or bidirectional.

Considering that the Choreography pattern is event/message based mainly(although it can be REST/gRPC), bidirectional message communication can be realized like this.

A service sends a message to a topic/queue/exchange, which is read by one of several instances of an attended service. A service's instance has a dedicated topic/queue, that gets the incoming message.

The same construction can be used on the Orchestration pattern also, but it is inherent for choreography.

In a nutshell: Domain Logic is distributed among services of the domain and is realized by the sequence of triggering mainly.

Advantages of Choreography.

  1. Minimal (almost 0) coupling of services. Each one of the services triggers an attended service by request/event and gets results when needed. No controller is presented. Logic is distributed among services. Even if the contract /API of an attendee service is changed, this is just the business of 2 neighbors.
  2. Business logic can be modified simply: just put a new service in a chain of call of services. It is just a changing configuration of services to send messages to another topic/queue. However, this works fine when changes can be implemented in the frames of "decoration"
  3. Neither of these operations requires a controller.

This is a nice construction, especially if we want to reduce coupling, maximally reuse microservices, and consider Open Architecture options looking forward to extending the system.

However, there are serious challenges.

Downsides of choreography pattern:

  1. There is a problem in organizing subdomain logic if our next operation depends on the result of previous operations.
  2. If the subdomain has a transactional logic, then it is a challenge. Any request processing progress must be logged by involved services and shared between the instances of the servers. Note, that a service that has been triggered, can be failed when the transaction should be rolled back.
  3. The same as 2 for durable operations: when a service or its JVM fails, this request must be continued upon restarting a new machine where the service failed. Persistence?
  4. Returning results over the chain of services requires waiting for the result till the last service is executed.

Problems and possible solution ideas.

  1. Coordinate invocation/triggering services conveniently when its execution is encapsulated in a service but the service is not a single point of failure, and its logic knows interaction contracts of involved services as least as it is possible.
  2. The orchestration paradigm is an excellent base for implementing distributed processing logic, but to satisfy 1, the business logic of an orchestration service should be as formalizable as possible.
  3. Why orchestrator should have executable business logic, could it be formalized to operate with a workflow recipe that is transferred transparently along all the workflow execution?
  4. Can we use the Choregraphy pattern when a triggering and result event/message transfers in and out data together with the state of workflow?

1+2+3 could make the Orchestrator indifferent to message/event contracts of involved services and just executing simple operations like a chef reads recipes of cooked food.

In this case "Orchestrator" loses its value as a real orchestrator, when the Choreography pattern (reaction to the event) continues working.

On the other hand, each of the business logic services should just handle incoming messages in accordance with any contract, which is blind to the business payload of the message and sends the response back to routing This aspect is inherent to the Choreography.


The FAT Events pattern could be used to compromise between the Choreography and Orchestration, take their advantages, and reduce downsides significantly(for a broad spectrum of cases).

The Fat Events pattern is not a new thing and is used for decoupling components of the Domain Logic in the frames of the DDD concept.

In a nutshell, a fat event transports data, which are not changed along the execution of the corresponding command in the invoked service.

Our concept of the FAT event usage:

  1. create RECIPE and formalize orchestration logic of workflow management as the cooking stages, each of them describes the next request and usage of results of the request execution by a service.
  2. Create a state of the workflow, which is modified by the Services, that replaces the orchestrator.
  3. Transport the state transparently along workflow progress.

We would extend the description of The Fat Events by assuming that a fat event contains data, that is transparent for the service and carries information, that is managed by the replacer of Orchestrator.

Another aspect of this principle is to rid a (let us say) orchestrator service of keeping the state of the distributed workflow (and take care about the state's persistence for durability and transactions).

Using commands and events for services triggering and data back notification.

Command ID and command data of a FAT Message/Event are used for the internal execution in service. The transparent workflow state is updated (!) and used in an orchestrator.

But it is not enough for our goal: while the Orchestrator keeps its state in the Command Data information, it has its business logic still.

What do we want yet, what is the idea?

Oh, just a small thing - formalize a distributed workflow definition, which will be a workflow recipe, and include:

  • workflow identifier
  • workflow data
  • workflow status
  • list of the workflow stages. input and output parameters for a stage, and its transactional parameters.
  • Output data of workflow
  • Error message
  • another additional information

We need just to define the metadata of the RECIPE, which describes a workflow of service calls, where each of the calls is a stage.

The RECIPE should contain its stages description, including which parameters should be taken from the flow state and inserted into parameters of the command/event of a service call and identifier of the commands. It should be used by the replacer of the orchestration mechanism to create and update the state of workflow while managing service call.

The flow state, thus, should keep the identifier of related flow (to find its recipe), the URI of a client, and a map of parameters that must be transmitted as input and output of service calls.

The state data can be transmitted as a transparent part of an event /command of call, or kept in an external storage.


Transmission of a workflow state in a FAT EVENT.

Defining a workflow recipe.

Let us define the recipe (StageRecipe) that describes a stage of work:

@Data
public class StageRecipe {
/** status of flow execution */
    public enum  ExecutionStatus {progress, success,rollback,failed}; 
/** Identifier of  command of this stage  */
    private String commandId;

/** URI of this message */
    private String serviceURI;

/** is the stage transactional */
    private boolean transactional=false;

/**  mapping common data map in FlowRecipe on input data of 
this stage                             */
    private Map<String,String> inputParamsMapping = new HashMap<>();
    private Map<String,String> outputParamsMapping = new HashMap<>();

}        

The reason for the majority of fields is clear.

The inputParamsMapping field contains a key-value map, where the key is the key in the commonParametersMap in the FlowStateData object ( that keeps workflow data along execution), and the value is the key. with that, the value transmitted to the event/command is related to the service.

outputParamsMapping has the same reason for output params.

The reason for the mapping: the key of a command parameter in the command data may be different from the key of this parameter in the map of parameters of the whole workflow. We will return back to this aspect.

The StageRecipe as mentioned defines a stage of the workflow. It is contained in the list of stages in the FlowRecipe, which describes the workflow and will be used for forming service calls, workflow management, and updating its state.

A workflow recipe defines the workflow process and will be used to build workflow state, workflow stages, and command events to be handled by services. Its content is quite transparent:

@Data
public class FlowReceipe {
    /**
     * identifier of the recipe to be used for flow data building
     */
    String recipeId;
    
    /** URI of the router which dispatches initial request and response */
    String recipeRouterURI;
    /**
      List of stage definition {@link StageRecipe}
     */
    private List<StageRecipe> stages = new ArrayList<>();
    
   private Map<String,Object> inParamsMap = new HashMap<>();

  private Map<String,Object> outParamsMap = new HashMap<>();


}        

It has an identifier recipe, which will be used by the Orchestrator/Router to build workflow state and find the related RECIPE when the state is deserialized from a transparent field of the call, that is being handled by the orchestrator.

The second important parameter receipeRouterURI - URI of Orchestrator/Router, where results will be sent to.

The FlowRecipe is used to create the FlowStateData instance to keep the state of the current workflow instance. This is to be performed by the service, which replaces the Orchestrator.

@Data
public class FlowStateData {
/** identifier of recipe for router */
    private String recipeId
/** URI of client which sent source request */
    private String clientURI;

/**      uuid of the current flow   */
    private String uuid = UUID.randomUUID().toString();

/** Error message */
    private String errorMesage;

/**    status of the execution    */
    private StageRecipe.ExecutionStatus executionStatus;

/**   flow data  map, which contains input, output and intermediate data, which are referred by stage recipe   */
    private Map<String,Object> commonFlowData = new HashMap<>();

/** Map of transaction data to be used on rallback       */
    private Map<String,Map<String,Object>> transactionalData = new HashMap<>();

/**  order of stage */
    private int stage;
}        

This state has a field?recipeId that links the id of the flow to the recipe, which will be used to create the state data of Flow, generate events/commands to services, and accept results.

Other crucial fields of the FlowStateData:

  • uuid - a unique identifier of the flow instance
  • errorMessage - set when the error occurred and means the end of workflow progress
  • executionStatus - status of the distributed flow progress. If the errorMessage field is set, then this value should be either rollback or fail.
  • commonFlowData - map of all data of this Flow, including input, intermediate, and output data. This data will be remapped to command data while generating commands /events for services.
  • transactionalData - transaction data for rollback
  • stage - order of stage, which recipe is defined.

However, the "Orchestrator" now has no specific business logic. It just performs operations with data fields of RECIPE and states those are common for any business logic.

Thus, the "Orchestrator" has no sense as a business subdomain component, and becomes to be just router, smart outer, however. It should not be related to a domain logic and become a part of the infrastructure layer.

The router is skilled in obtaining FlowRecipe schemas, creating a FlowRecipeData, which is the state of the projected distributed workflow, creating the StageExecutionCommand to call a service, and processing the StageExecutedEvent.

Orchestrator becomes a Smart Router, so lets call it a SmartRouter .

It will be dislayed in all diagrams as

By the concept, the FlowStateData is transported transparently along the workflow executions.

The FlowRecipeData is created, managed, and used by the SmartRouter.

The SmartRouter creates and routes command events to the services gets results, and manages the state of the distributed workflow.

Recipe, state, command, and events object relationships.

StageExecutionCommand is built by the SmartRouter by following the recipe for this stage, which is contained in the FlowRecipe. The index of the current stage kept in the FlowStateData#stage field.

The FlowStateData is used ONLY INTERNALLY in the SmartRouter. Its serialized form is set as a blob into the StageExecutionCommand event data, transferred transparently, and returned by services.

The StageExecutionCommand is created by the SmartRouter using the FlowStateRecipe instance and the StageRecipe, that corresponds to the current stage.

The StageExecutionCommand has the following crucial fields:

  • reason ( what is going on: workflow progress, rollback..)
  • parameters: map (or an object of corresponding type) of input data for the service
  • transactionData: map of data for transaction rollback (useable together with reason==rallback )

Upon command has been executed by the attended service, the StageExecutedEvent is sent to the SmartRouter, which has the following crucial fields:

  • outParams: output parameter values, which are returned by the service
  • error: error message/id, what else signalizing about an error (SmartRouter will change the distributed flow status to either "failed", or "rollback")
  • blob: transparent parameter, which must have the same value as in the StageExecutionCommand. SmartRouter, will restore the FlowStateData from this value for further updates.

The?SmartRouter builds the StateExecutionCommand#parameters by getting parameters from the FlowStateData#commonFlowData, using the mapping rules from the StageRecipe#inputParamsMap key mapping.

A remark about parameters mapping between FlowStateData#

commonFlowData and the StageExecutionCommand#parameters.

The above-described mapping procedure might be very applicable because the names of the keys in the FlowStateData#commonFlowData might be complicated (such as stage2.accountID), while the StateExecutionCommand#parameters could be mapped to a data class of business logic of a service. Besides the naming convention of FlowStateData#commonFlowData, which is an internal object of the SmartRouter (which does not belong to a business domain), should not be coupled with the parameter names of a concrete service: the service's call parameter names should have a business reason.

The StageExecutionCommand is built in the SmartRouter, involving "objects for relational use only" - FlowStateData and StageRecipe. But only the StageRecipe is kept permanently (in the FlowRecipe).


The execution result is ready, and the event - StageExecutedEvent is built on a target Service's behalf.

As we see, the FlowStateData is transported transparently through the call and will be used by the SmartRouter while processing the StageExecutedEvent, that has beensent by a service of the workflow.

The code snippet below demonstrates a simple example of the StageExecutionCommand building in the SmartRouter using the instance of FlowStateData and FlowRecipe:

   private static StageExecutionCommand nextRequestInProgress(FlowStateData data, FlowRecipe recipe) {
        List<StageRecipe> stages = recipe.getStages();
        if (stages.size() <= data.getStage()) {
            return null;
        }
        int stageId = data.next().getStage();
        StageRecipe nextRecipe = stages.get(stageId);
        byte[] serialized = serialize(data, getExternalInfo());
        StageExecutionCommand result = new StageExecutionCommand(serialized, nextRecipe.getCommandId(), data.getUuid());
        result.setReason(data.getExecutionStatus());
        result.setOperation(result.getOperation());
        result.setTargetUrI(nextRecipe.getServiceURI());
        Map<String, Object> nextData = data.getCommonFlowData();
        Map<String, Object> actualData = result.getParameters();
        nextRecipe.getInputParamsMapping().forEach((k, v) -> {
            actualData.put(v, nextData.get(k));
        });
        return result;
    }        

This is an example of the StageExecutedEvent?handling in the SmartRouter.

    /**
     * handle response for and opertaion from required service
     *
     * @param result result of last processing request
     * @param recipe - recipee of processing
     * @return and instance of the RequestDTO clas, which can be either
     */
    public static FlowExchangeDTO processResponse(StageExecutedEvent result, FlowStateData dataExt, FlowRecipe recipe) {
        if (dataExt == null)
            dataExt = fromSerialized(result);
        FlowStateData data = dataExt;
        ///IJSONSerializeable.load(result.getBlob(), FlowStateData.class);
        if (result.getErrorCode() == null && (data.getExecutionStatus().equals(success) || data.getExecutionStatus().equals(progress))) {
            Map<String, Object> params = result.getParameters();
            recipe.getStages().get(data.getStage()).getOutputParamsMapping().forEach((k, v) -> {
                Object back = params.get(k);
                if (back == null)
                    return;
                data.getCommonFlowData().put(v, back);
            });
            if (data.getStage() == recipe.getStages().size() - 1) {
                data.setExecutionStatus(success);
                Map<String, Object> map = new HashMap<>();
                recipe.getOutParamsMap().forEach((k, v) -> {
                    map.put(v, data.getCommonFlowData().get(k));
                });
                FinalResponse response = new FinalResponse(result, data.getClientURI(), map);
                response.setOperation(recipe.getRecipeId());
                return response;
            }
            data.getTransactionalData().put(result.getOperation(), result.getTransactionData());
            return generateRequest(data, recipe);
        }

        if (data.getExecutionStatus().equals(rallback)) {
            if (data.getStage() == 0) {
                return result;
            }
        }
        data.setExecutionStatus(failed);
        int currStage = data.getStage();
        for (int i = currStage; i <= 0; i--) {
            StageRecipe sr = recipe.getStages().get(i);
            if (sr.isTransactional()) {
                data.setStage(i);
                data.setExecutionStatus(rallback);
                return generateRequest(data, recipe);
            }
        }
        return new FinalResponse(result, data.getClientURI(), null);

    }
        

The handling code performs very simple operations:

  1. Restores the FlowStateData instance for the current data
  2. If an error message is not set, then output parameters are extracted from the execution event and mapped onto common parameters of FlowStateData, and if the last stage has been executed, then the result is formed and returned, else next request generated
  3. If an error message is set, then StageRecipe of previous stages is listed backward until a first transactional stage is found, the status is set to rollback, and a rollback request is generated, but the final result is formed and returned when there is no a transactional stage behind

So SmartRouter does not even need to keep a state of the distributed workflow. Furthermore, it knows nothing about what is done in the distributed workflow, that is exactly what we want.

As we see, each next successful step increments the stage by 1, and, if there are yet stages to be executed in the distributed workflow, it gets related StageRecipe and builds StageExecutionCommand.

If there is no rest of the steps, then the result returns to the initiator of the workflow instance.

If a command execution is unsuccessful, the stage is decremented, the status is set to rollback, and a new, rollback command will be generated to rollback the distributed flow.

The SmartRouter uses the StageExecutionCommand#targetURI parameter as the destination URI, to send the data to.The parameter is copied from the StageRecipe#serviceURI.

Assuming, that the SmartRouter and Services communicate via a Message Queue (Kafka for example), let us see the whole picture.


Command - Recipe segregation

As we see, there are 2 separate data flows in the interactions while progressing the distributed flow.

  1. StageExecutionCommand and StageExecutionEvent are the transport objects for command and result exchange between the SmartRouter and services along the progress of workflow. At the same time, both of them carry FlowStateData transparently over the service. StageExecutionCommand is generated by the SmartRouter and sent to the related message queue. StageExecutedEvent is created by a Service and directed to the queue of the "Smart Router"
  2. FlowStateData and FlowRecipe are used in the SmartRouter solely. FlowStateData is created and updated solely in the SmartRouter and never outside it.

The business logic of any Service does not know anything either about the business logic of other Services and current workflow, or where the result will be directed to, and which service will be triggered next.

The SmartRouter, as mentioned, knows nothing about which service will be called, and the contract of the command message.

Full Workflow Sequence.

Update the SmartRouter service with FlowRecipe for workflows expected

Any Configuration Service sends the FlowRecipe list to SmartRouter, which puts it into the RecipeStorage of the SmartRouter.

Triggering workflow in SmartRouter:

A client Service sends an execution request FlowTriggerEvent to the SmartRouter, which retrieves related FlowRecipe from the Recipe Storage

FlowTriggerEvent was sent and the FlowRecipe extracted

The SmartRouter uses the extracted FlowRecipe?to create the FlowStateData, which will be used to generate FlowExecutionCommand for stages along the workflow progress.

A sample code snip is:

   /**
     * The trigFlow method processes incoming the incoming flowTriggerEvent triggering event  {@link FlowTriggerEvent}
     * retrieve related {@link FlowReceipe} from storage map and returns the first {@link StageExecutionCommand}
     * @param recipeMap - map of recipies by ID
     * @param flowTriggerEvent -  instance of the {@link FlowTriggerEvent} which contains data are necessary to create worklfow
     * @return - {@link StageExecutionCommand}  command to trig the first stage of the workflow
     */
    public StageExecutionCommand trigFlow(Map<String,FlowReceipe> recipeMap,FlowTriggerEvent flowTriggerEvent){
        FlowReceipe recipe= recipeMap.get(flowTriggerEvent.getOperation());
        if(recipe==null)
            return null;
        FlowStateData data= new FlowStateData();
        data.setClientURI(flowTriggerEvent.getClientUri());
        Map<String,Object> mapData = new HashMap<>();
        data.setUuid(flowTriggerEvent.getUuid());
        data.setCommonFlowData(mapData);
        recipe.getInParamsMap().forEach((k,v)->{
            mapData.put(v,flowTriggerEvent.getParameters().get(k));
        });
         return generateRequest(data,recipe);

    }
        

The addressed Service processes the command.

The Service, which accepts the generated StageExecutionCommand , executes it and forms the StageExecutionCommand. The StageExecutedEvent obtains the blob from the StageExecutionCommand and then is returned to the SmartRouter

A Service processes the StageExecutionCommand and transparently returns blob with the result

Now SmartRouter accepts and handles the StageExecutedEvent and updates StageExecutedEvent.

The incoming StageExecutionEvent brings return parameters and the blob, containing the serialized FlowStateData.

The FlowStateData always contains the identifier of the related FlowRecipe. SmartRouter gets the return data from the event and maps it onto FlowStateData#commonFlowData, computes the next stage in the FlowStateData#stage, retrieves the proper StageRecipe, creates the new StageExecutionCommand and maps FlowStateData#commonFlowData?onto the StageExecutionCommand#parameters, and sends the command to the SmartRouter:

The whole process scheme:

Recipe execution sequence and data relationships.

BENEFITS

  1. We combine workflow logic encapsulation that is inherent to the Orchestration pattern and the Choreography pattern which inherits maximal services decoupling. This approach has neither orchestration-service coupling nor any knowledge of a concrete service data contract.
  2. No redelivery of the pseudo-orchestrator (SmartRouter), is needed if we want to modify workflow: the SmartRouter Service works blindly, using just FlowRecipe, which is obtained externally as a configuration data (like an old pharmacy worker, which does not know, what medicine is being prepared)
  3. The logic of the workflow management is formalized and moved to the metadata (JSON for instance) of domain logic, and the metadata is readable in any language.
  4. The SmartRouter services have become just an infrastructure component; they are no longer related to business logic (subdomain).
  5. Workflow execution can be continued on any machine, from the same state/stage, where it was interrupted/failed. If a failure has occurred while orchestrating, then the operation will be repeated on another machine. where the next instance of the SmartRouter, has been started instead. No special state data save is required.
  6. Transaction of the workflow ralls back straightforward and durably if a failure occurs because of the same causes as in the 5.

Downsides:

  1. Serialization/deserialization of the FlowStateData structure can spend processing time and work against performance. Besides, the volume of serialized data can be large, making message transfer slower.
  2. Parallel execution of the workflow is problematic.
  3. It is not a straightforward realization. The realization of the Service requires the command-event data transport contract, which is ultimate for formalize processing on the SmartRouter . This is a small challenge, but it can be a source of error. However, it is a challenge at the same level as gRPC, SOAP, or other protocol implementations. We just need to create a simple API for a custom service, which hides the protocol's inherent operation and forms the event, which is modeled by the StageExecutionEvent. However, this is a challenge.
  4. All of this works, if we have no logical branches, etc - when the progress of the workflow depends on either the success or failure of the previous stage. If the next stage depends on the output parameters of the previous stage, then more smart logic is required.

An example of the command-data contract implementation is a simple API.

Exposed interface for a service creation

public interface ICommandHandlingHub {

    /**
     *  creates instance of a command executor, runs business logic and gets result
     * @param command - source command
     * @param inType  - input type
     * @param executor - executor function
     * @param transactionCompensator - compensator which should be called when status is {@link StageRecipe.ExecutionStatus#rallback}
     * @return  instance of  the  class
     * @param <IN>
     * @param <OUT>
     */
     <IN,OUT> void  addCommandHandling(String command,Class<IN> inType, BiFunction<IN, Map<String,Object>,OUT> executor, Consumer<Map<String,Object>> transactionCompensator);

    /**
     * executies buisness logic anf forms result as event
     * @param command - command to be executed
     * @return - The Mono form of reactor. which accepts the {@link StageExecutionCommand}
     */
      Mono<StageExecutedEvent>  execute(StageExecutionCommand command);
}        

It exposes methods for

  • accepting per command business logic as BiFunction, transaction compensator as Consumer, and a class of input data of business logic
  • execute incoming command

Implementation is quite straightforward.

public enum CommandHandlingHub implements ICommandHandlingHub
{
   inventory{
       /** map of required handlers per command */
       Map<String, Triple<Class,BiFunction,Consumer>> mapper=new HashMap<>();
       @Override
       public <IN, OUT> void addCommandHandling(String command, Class<IN> inType, BiFunction<IN, Map<String, Object>, OUT> executor, Consumer<Map<String, Object>> compensator) {
           mapper.put(command, new Triple<Class, BiFunction, Consumer>() {
               @Override
               public Class getLeft() {
                   return inType;
               }
               @Override
               public BiFunction getMiddle() {
                   return executor;
               }

               @Override
               public Consumer getRight() {
                   return compensator;
               }
           });
       }
       @Override
       public Mono<StageExecutedEvent> execute(StageExecutionCommand command) {
           Triple<Class,BiFunction,Consumer> triple= mapper.get(command.getOperation());
           return StageExecutionProcedure.prepareCommandExecution(command,triple.getLeft(), triple.getMiddle(),triple.getRight()).andExecute();
       }
   }

}        

The execution procedure build is also straightforward


/**
 *
 * @param <IN> input params type
 * @param <OUT> output type
 */
public class StageExecutionProcedure<IN,OUT> {
    /** JSON mapper */
    private static final ObjectMapper theMapper = new ObjectMapper();

    /** type to convert to map */
    private static final TypeReference<Map<String,Object>> mapType= new TypeReference<Map<String,Object>>() {};
    public static final String UNEXPECTED_STATUS = "unexpected status";

    /** event to be updated by execution result and returned */
    StageExecutedEvent  event = new StageExecutedEvent();

    /** processing function, which gets input paramenter, in-out map to accepts transactional data,
     *  returns processing values */
    private BiFunction<IN,Map<String,Object>,OUT> procedure;

    /** compensation consumer, which will be called when rallback is the status*/
    private Consumer<Map<String, Object>> compensator;

    /** source command */
    private StageExecutionCommand command;

    /** input type */
    private Class<IN> inType;

    /**
     *  creates instance of a command executor
     * @param command - source command
     * @param inType  - input type
     * @param executor - executor function
     * @param transactionCompensator - compensator which should be called when status is {@link StageRecipe.ExecutionStatus#rallback}
     * @return  instance of  the  class
     * @param <IN>
     * @param <OUT>
     */
    public static <IN,OUT> StageExecutionProcedure<IN,OUT> prepareCommandExecution(StageExecutionCommand command, Class<IN> inType, BiFunction<IN,Map<String,Object>,OUT> executor, Consumer<Map<String,Object>> transactionCompensator){
        StageExecutionProcedure<IN,OUT> executionProcedure = new StageExecutionProcedure<>();
        executionProcedure.event.setOperation(command.getOperation());
        executionProcedure.event.setUuid(command.getUuid());
        executionProcedure.command= command;
        executionProcedure.procedure= executor;
        executionProcedure.inType= inType;
        executionProcedure.compensator= transactionCompensator;
        return executionProcedure;
    }

    /**
     * execites the prepared execution procedures
     * @return   - reactor on  StageExecutedEvent ready in the Mono <>  form.
     */
    Mono<StageExecutedEvent> andExecute(){
        Mono<StageExecutedEvent> res= Mono.create((ms)->{
            String[] errorMessage= new String[1];
            Map<String,Object> out= null;
            Map<String,Object> rallbackParams = new HashMap<>();
            switch(command.getReason()){
                case progress -> {
                   out=tryExecute(rallbackParams,errorMessage);
                }
                case rallback->{
                    rallback(command.getParameters(),errorMessage);
                }
                default ->{
                    errorMessage[0]= UNEXPECTED_STATUS;
                }
            }
            event.setErrorCode(errorMessage[0]);
            event.setTransactionData(rallbackParams);
            event.setBlob(command.getBlob());
            if(out!=null)
                event.getParameters().putAll(out);
            ms.success(event);
        });
        return res;

    }

    /**
     *  executes  service's business logic, as a BiFunction
     * @param rallbackParams - map which accepts parameters of transaction if required
     * @param errorMessage - reference on the error message.
     * @return   - output parameters in the form of map
     */
    private Map<String,Object> tryExecute(Map<String,Object> rallbackParams, String [] errorMessage){
        try {
            IN in = theMapper.convertValue(command.getParameters(),inType);
            return theMapper.convertValue(procedure.apply(in,rallbackParams),mapType);
        }
        catch(Throwable t){
            errorMessage[0]= t.getMessage();
            return null;
        }
    }
    private void rallback(Map<String,Object> rallbackParams, String [] errorMessage){
        try {
            compensator.accept(rallbackParams);
        }
        catch(Throwable t){
            errorMessage[0]= t.getMessage();
        }
    }
}
        

Fighting a potentially large size of state data.

As mentioned, the data state might be large, and transparent transportation of it with?the StageExecutionCommand#blob?and the?StageExecutedEvent#blob?could be problematic for Kafka, for example.

Remember, that data Kafka message is transferred and redirected inside brokers over TCP/IP protocols, where the atomic message side is about 1 kbyte. While Kafka is a throughput champ, the growth of message size leads to performance degradation. The FlowStateData could be large, mainly, because the size of the FlowStateData#commonFlowData can grow, due to the size of values kept. Consequently, the blob field will grow, together with the command and event message.

What is the solution? The first and easiest way is to use external storage to keep a portion of data of a large size. This way is used in Kafka when large messages are transferred as references to the URI of external storage such as S3, EFS, Redis, and so on.

Considering our goals, we will use a reference to the Redis in the StageExecutionCommand#blob, StageExecutedEvent#blob.


public class DataURI {
/**  the marker prefixes seralized data of this class  */
    static public String MARKER = ":data_uri_marker->";
    /** identifier of recipe */
    String recipeID;
/** A uri in  a concrete external data mechanism (RMap name for instance)*/
    String uri;

/** entry in the URI */
    String entry;
}
        

This class points to the location of real data and it will be serialized/ deserialized in the blob field (and deserialized consequently).

An instance of the class is created and bound to a concrete FlowRecipe instance using the recipeId parameters.

Now, suppose that we defined the interface:


public interface IExternalStateInfo {
    
    /** adds info */
    void addInfo(DataURI data);
    
    /** get info by identifier of recipe */
    DataURI get(String id);
}        

which accepts DataURI?instances and maps them using the DataURI#recipeID as the key.

Like this

static private IExternalStateInfo stateInfo = new IExternalStateInfo() {
    Map<String,DataURI> infoMap = new HashMap<>();
    @Override
    public void addInfo(DataURI info) {
        infoMap.put(info.recipeID,info);
    }

    @Override
    public DataURI get(String id) {
        return infoMap.get(id);
    }
};        

The DataURI information is also sent by the configuration service, accepted, mapped, and accepted by the SmartRouter service.

Configuring with FlowRecipe and external data mapping


The service will keep the FlowStateData in the external storage if a DataURI instance is mapped for the current FlowStateData#recipeId.

We need only create serializers of FlowStateData considering th

private static FlowStateData fromSerialized( StageExecutedEvent event){
    String strBlob = new String(event.getBlob());
    if(strBlob.startsWith(DataURI.MARKER)){
        strBlob = strBlob.substring(strBlob.length());
        DataURI info= IJSONSerializeable.load(strBlob, DataURI.class);
        return Cache.instance().retrieve(info.uri,info.entry);
    }
    return IJSONSerializeable.load(strBlob,FlowStateData.class);
}
private static byte[] serialize(FlowStateData data,IExternalStateInfo info){
    DataURI dataInfo=info.get(data.getRecipeId());
    if(dataInfo==null)
        return IJSONSerializeable.serialize(dataInfo);
    Cache.instance().addObject(dataInfo.getUri(), dataInfo.getEntry(),data);
    return IJSONSerializeable.serialize(DataURI.MARKER,dataInfo);

}        

And use it in the internal transformations

private static StageExecutionCommand nextRequestInProgress(FlowStateData data, FlowRecipe recipe) {
    List<StageRecipe> stages = recipe.getStages();
    if (stages.size() <= data.getStage()) {
        return null;
    }
    int stageId = data.next().getStage();
    StageRecipe nextRecipe = stages.get(stageId);
    byte[] serialized = serialize(data, getExternalInfo());
    StageExecutionCommand result = new StageExecutionCommand(serialized, nextRecipe.getCommandId(), data.getUuid());
    result.setReason(data.getExecutionStatus());
    result.setOperation(result.getOperation());
    result.setTargetUrI(nextRecipe.getServiceURI());
    Map<String, Object> nextData = data.getCommonFlowData();
    Map<String, Object> actualData = result.getParameters();
    nextRecipe.getInputParamsMapping().forEach((k, v) -> {
        actualData.put(v, nextData.get(k));
    });
    return result;
}        

What do we win when using external data storage?

It is useable when leveraging the EventDriven model over Kafka to avoid throughput degradation.

Does this principle increase performance in general or save CPU time by reducing the size of serialization? Nope. Usage of any external storage leads to serialization by definition. So, the only winning topic in this way is to handle the message size problem of a framework.

An example of the RECIPE, FAT EVENTS, and SmartRouter realization.

Let us suppose that we have a group of microservices that sell stock shares upon request. There are shares sellers (owners) and buyers.

Any user (seller or buyer) has an account. where funds are kept, shares holder account which keeps all shares purchased.

Any seller populates shares for buyers, where a shares package can be queried by IDs of shares and cost amount.

So, there are the following services with their own databases

Microservices serving shares selling

Money Account Service managed funds of participators.

Shares for sale Services queries shares by identifier and cost of package.

Share account services manage shares of participators.

Share purchase is realized by the following flow:

  • find share and its owner packages by identifiers and cost of shares
  • lock sum of the package costs on the account of the buyer.
  • lock shares of the share account of the seller (owner)
  • transfer money from the buyer to the owner's account.
  • transfer shares from the owner to the buyer account.

First, let us define the recipe which describes the flow, contains stages, input and output parameters mapping, and map of common storage parameters:

{
  "recipeId": "buyShares",
  "recipeRouterURI": "routerQ",
  "stages": [
    {
      "commandId": "findShares",
      "serviceURI": "queryQ",
      "transactional": false,
      "inputParamsMapping": {
        "deal.shareID": "shareID",
        "deal.amount": "amount"
      },
      "outputParamsMapping": {
        "amount": "deal.amount",
        "shareID": "deal.shareID",
        "ownerID": "deal.ownerID"
      }
    },
    {
      "commandId": "lockFunds",
      "serviceURI": "moneyAccountQ",
      "transactional": false,
      "inputParamsMapping": {
        "deal.buyerID": "buyerID",
        "deal.amount": "amount"
      },
      "outputParamsMapping": {
        "buyerID": "deal.buyerID",
        "locked": "deal.lockedFunds"
      }
    },
    {
      "commandId": "lockShares",
      "serviceURI": "shareAccountQ",
      "transactional": false,
      "inputParamsMapping": {
        "deal.ownerID": "ownerID",
        "deal.amount": "amount"
      },
      "outputParamsMapping": {
        "ownerID": "deal.ownerID",
        "locked": "deal.lockedShares"
      }
    },
    {
      "commandId": "transferFunds",
      "serviceURI": null,
      "transactional": false,
      "inputParamsMapping": {
        "deal.ownerID": "ownerID",
        "deal.buyerID": "buyerID",
        "deal.lockedFunds": "locked",
        "deal.amount": "amount"
      },
      "outputParamsMapping": {
        "locked": "deal.lockedFunds"
      }
    },
    {
      "commandId": "transferShares",
      "serviceURI": null,
      "transactional": false,
      "inputParamsMapping": {
        "deal.ownerID": "ownerID",
        "deal.buyerID": "buyerID",
        "deal.amount": "amount"
      },
      "outputParamsMapping": {
        "locked": "deal.lockedShare"
      }
    }
  ],
  "inParamsMap": {
    "shares": "deal.shareID",
    "clientID": "deal.buyerID",
    "sum": "deal.amount"
  },
  "outParamsMap": {
    "deal.shareID": "shares",
    "deal.ownerID": "from",
    "deal.buyerID": "clientID",
    "deal.amount": "sum"
  }
}        

The recipe defines the input and output parameters of the workflow trigger, their mapping on parameter storage, mapping common parameters on services call contracts, mapping output parameters, that are returned by services on common parameters, and mapping necessary common parameters on the final result.

Let us send the trigger to the router:


  "operation": "buyShares",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "shares": "Coca-Cola_123",
    "clientID": "[email protected]",
    "sum": 1200000.0
  },
  "clientUri": "uri"
}        

SmartRouter, by the definition of stage 0 returns the following command:

{
  "operation": "findShares",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "shareID": "Coca-Cola_123"
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTM2MTJhNWEtOGVmYS00YTFjLWFiMTEtYzAxZGE1YTllNjE4IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjB9LCJ0cmFuc2FjdGlvbmFsRGF0YSI6e30sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjB9",
  "transactionData": {},
  "targetUrI": "queryQ",
  "reason": "progress"
}        

The Share for Sale Service has a handler of the command

       CommandHandlingHub.inventory.addCommandHandling(FindSharesStageRecipe.COMMAND, FindShares.class, (in, mapTx) -> {
            ShareFound found = new ShareFound();
            found.setOwnerID("[email protected]");
            found.setShareID(in.getShareID());
            found.setAmount(in.getAmount());
            return found;
        }, null);        

and triggers the event upon the command processed

## event
{
  "operation": "findShares",
  "uuid":"f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "shareID": "Coca-Cola_123",
    "amount": 1200000.0,
    "ownerID": "[email protected]"
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTM2MTJhNWEtOGVmYS00YTFjLWFiMTEtYzAxZGE1YTllNjE4IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjB9LCJ0cmFuc2FjdGlvbmFsRGF0YSI6e30sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjB9",
  "transactionData": {},
  "errorCode": null
}        

The owner id of the shares is returned and it will be kept in the state data.

The SmartRouter handles the event, progresses the stage number and the next command is

{
  "operation": "lockFunds",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "buyerID": "[email protected]"
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTM2MTJhNWEtOGVmYS00YTFjLWFiMTEtYzAxZGE1YTllNjE4IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjB9LCJ0cmFuc2FjdGlvbmFsRGF0YSI6eyJmaW5kU2hhcmVzIjp7fX0sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjF9",
  "transactionData": {},
  "targetUrI": "moneyAccountQ",
  "reason": "progress"
}        

The FlowStateData, which is serialized in the blob has the value

{
  "recipeId": "buyShares",
  "clientURI": "uri",
  "errorMesage": null,
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "commonFlowData": {
    "deal.shareID": "Coca-Cola_123",
    "deal.buyerID": "[email protected]",
    "deal.amount": 1200000.0,
    "deal.ownerID": "[email protected]"
  },
  "transactionalData": {
    "findShares": {}
  },
  "executionStatus": "progress",
  "stage": 1
}        

Now it contains the deal.ownerID value, which will be used in future operations after mapping on the related service call contract.

The "lockFunds" command is sent to the Money Account Manager service, where the handler defined:

       CommandHandlingHub.inventory.addCommandHandling(LockFundsStageRecipe.COMMAND, LockFunds.class, (in, mapTx) -> {
            FundsLocked locked = new FundsLocked();
            locked.setBuyerID(in.getBuyerID());
            locked.setLocked(in.getAmount());
            return locked;
        }, null);        


It triggers the event to the SmartRouter upon the command handled.

  "operation": "lockFunds",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "buyerID": "[email protected]",
    "locked": 1200000.0
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiNGI4MTBmZmQtOTdmOS00OGFlLTlmZTUtNGM1Y2IwODgzZTgzIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIn0sInRyYW5zYWN0aW9uYWxEYXRhIjp7ImZpbmRTaGFyZXMiOnt9fSwiZXhlY3V0aW9uU3RhdHVzIjoicHJvZ3Jlc3MiLCJzdGFnZSI6MX0=",
  "transactionData": {},
  "errorCode": null
}        

The SmartRouter handles the command, using related StageRecipe, updates the FlowStateRecipe#commonFlowData

{
  "recipeId": "buyShares",
  "clientURI": "uri",
  "errorMesage": null,
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "commonFlowData": {
    "deal.shareID": "Coca-Cola_123",
    "deal.buyerID": "[email protected]",
    "deal.amount": 1200000.0,
    "deal.ownerID": "[email protected]",
    "deal.lockedFunds": 1200000.0
  },
  "transactionalData": {
    "findShares": {},
    "lockFunds": {}
  },
  "executionStatus": "progress",
  "stage": 2
}        

, progresses to the next stage, and generates the new command:


{  "operation": "lockShares",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "ownerID": "[email protected]"
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiZDUxMDg1Y2EtNjlkMC00ZjI2LTljYTktNzMyYTgwMzJmOTM0IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e319LCJleGVjdXRpb25TdGF0dXMiOiJwcm9ncmVzcyIsInN0YWdlIjoyfQ==",
  "transactionData": {},
  "targetUrI": "shareAccountQ",
  "reason": "progress"
}        

to Shares Account Managing Service.

The Share Account Service has the following handler for this command:

      CommandHandlingHub.inventory.addCommandHandling(LockShareStageRecipe.COMMAND, LockShares.class, (in, mapTx) -> {
            SharesLocked locked= new SharesLocked();
            locked.setLocked(in.getAmount());
            locked.setOwnerID(in.getOwnerID());
            return locked;
        }, null);        

The service triggers back the following event upon the command processed.

{
  "operation": "lockShares",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "ownerID": "[email protected]",
    "locked": 1200000.0
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYmQ1ZWU2ODMtNTg5NS00OTliLWJkZDUtZTQ0MzkzYjhiY2YyIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e319LCJleGVjdXRpb25TdGF0dXMiOiJwcm9ncmVzcyIsInN0YWdlIjoyfQ==",
  "transactionData": {},
  "errorCode": null
}        

SmartRouter processes this event and updates the FlowStateData to the value

{
  "recipeId": "buyShares",
  "clientURI": "uri",
  "errorMesage": null,
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "commonFlowData": {
    "deal.shareID": "Coca-Cola_123",
    "deal.buyerID": "[email protected]",
    "deal.amount": 1200000.0,
    "deal.ownerID": "[email protected]",
    "deal.lockedFunds": 1200000.0,
    "deal.lockedShares": 1200000.0
  },
  "transactionalData": {
    "findShares": {},
    "lockFunds": {},
    "lockShares": {}
  },
  "executionStatus": "progress",
  "stage": 3
}        

moves to the next StageRecipe, and forms the new command:

{
  "operation": "transferFunds",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "ownerID": "[email protected]",
    "buyerID": "[email protected]",
    "locked": 1200000.0
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYmQ1ZWU2ODMtNTg5NS00OTliLWJkZDUtZTQ0MzkzYjhiY2YyIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9fSwiZXhlY3V0aW9uU3RhdHVzIjoicHJvZ3Jlc3MiLCJzdGFnZSI6M30=",
  "transactionData": {},
  "targetUrI": "moneyAccountQ",
  "reason": "progress"
}        

The command is sent to the Money Account Manager Service, where the related handler is defined:

       CommandHandlingHub.inventory.addCommandHandling(TransferFundsStageRecipe.COMMAND, TransferFunds.class, (in, mapTx) -> {
            FundsTransferred transferred= new FundsTransferred();
            transferred.setOwnerID(in.getOwnerID());
            transferred.setBuyerID(in.getBuyerID());
            transferred.setAmount(in.getAmount());
            transferred.setLocked(0);
            return transferred;
        }, null);        

The service fires the event upon the handler executed:

{
  "operation": "transferFunds",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "buyerID": "[email protected]",
    "ownerID": "[email protected]",
    "locked": 0.0
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiYTQ4ZmQzZjAtOWFkNi00ODI0LThmODItZDUwZjUzNGU1NTlkIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MTIwMDAwMC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9fSwiZXhlY3V0aW9uU3RhdHVzIjoicHJvZ3Jlc3MiLCJzdGFnZSI6M30=",
  "transactionData": {},
  "errorCode": null
}        

The SmartRouter handles this event, progresses to the last stage, and updates the FlowStateData :

{
  "recipeId": "buyShares",
  "clientURI": "uri",
  "errorMesage": null,
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "commonFlowData": {
    "deal.shareID": "Coca-Cola_123",
    "deal.buyerID": "[email protected]",
    "deal.amount": 1200000.0,
    "deal.ownerID": "[email protected]",
    "deal.lockedFunds": 0.0,
    "deal.lockedShares": 1200000.0
  },
  "transactionalData": {
    "findShares": {},
    "lockFunds": {},
    "lockShares": {},
    "transferFunds": {}
  },
  "executionStatus": "progress",
  "stage": 4
}        

and returns the next and the last command in our workflow (stage == 4 !):

{
  "operation": "transferShares",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "ownerID": "[email protected]",
    "buyerID": "[email protected]"
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiNjlhM2FlOTMtY2YyYi00MDQ0LWI4ZDQtMWVkNzk5ZDQ5ZGU0IiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9LCJ0cmFuc2ZlckZ1bmRzIjp7fX0sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjR9",
  "transactionData": {},
  "targetUrI": "shareAccountQ",
  "reason": "progress"
}        

The Share Account Service has the handler for this command:

      CommandHandlingHub.inventory.addCommandHandling(TransferSharesStageRecipe.COMMAND, TransferShare.class, (in, mapTx) -> {
            ShareTransferred transferred = new ShareTransferred();
            transferred.setAmount(in.getAmount());
            transferred.setOwnerID(in.getOwnerID());
            transferred.setBuyerID(in.getBuyerID());
            transferred.setLocked(0);
            return transferred;
        }, null);        


The service processes the command with this handler and fires back this event for the

{
  "operation": "transferShares",
  "uuid": "f34b39d4-7d32-4e1a-880a-b9a302e46e4d",
  "parameters": {
    "amount": 1200000.0,
    "buyerID": "[email protected]",
    "ownerID": "[email protected]",
    "locked": 0.0
  },
  "blob": "eyJyZWNpcGVJZCI6ImJ1eVNoYXJlcyIsImNsaWVudFVSSSI6InVyaSIsImVycm9yTWVzYWdlIjpudWxsLCJ1dWlkIjoiZjM0YjM5ZDQtN2QzMi00ZTFhLTg4MGEtYjlhMzAyZTQ2ZTRkIiwiY29tbW9uRmxvd0RhdGEiOnsiZGVhbC5zaGFyZUlEIjoiQ29jYS1Db2xhXzEyMyIsImRlYWwuYnV5ZXJJRCI6InNvbWVib2R5QGdtYWlsLmNvbSIsImRlYWwuYW1vdW50IjoxMjAwMDAwLjAsImRlYWwub3duZXJJRCI6Im93bmVyQG91dGxvb2suY29tIiwiZGVhbC5sb2NrZWRGdW5kcyI6MC4wLCJkZWFsLmxvY2tlZFNoYXJlcyI6MTIwMDAwMC4wfSwidHJhbnNhY3Rpb25hbERhdGEiOnsiZmluZFNoYXJlcyI6e30sImxvY2tGdW5kcyI6e30sImxvY2tTaGFyZXMiOnt9LCJ0cmFuc2ZlckZ1bmRzIjp7fX0sImV4ZWN1dGlvblN0YXR1cyI6InByb2dyZXNzIiwic3RhZ2UiOjR9",
  "transactionData": {},
  "errorCode": null
}        

The SmartRouter handles this event and, because there are no stages anymore, it returns the result to be returned to the caller:

{
  "operation": "buyShares",
  "uuid": "4531692c-c580-40d4-b30f-3906bfa6b090",
  "parameters": {
    "shares": "Coca-Cola_123",
    "clientID": "[email protected]",
    "from": "[email protected]",
    "sum": 1200000.0
  }
}        


More complicated cases of RECIPE with FAT EVENTS AND ROUTING.

We considered only the simplest cases of business logic when the next stage depends on the success or failure of the current stage. These cases encompass a broad spectrum of real industrial tasks. However,, there can be more complicated situations when the next stage depends on the result of the current stage. For example, the type or status of a client's account.

Could we handle such cases? Yes, we can. We need to use rules in the Recipe. But the continuation of the theme in this article would make it too expanded. I'll try to demonstrate this in the next one.



要查看或添加评论,请登录

Simon Cantor的更多文章

  • Data Stream Processing: Kafka vs Flink Streams.

    Data Stream Processing: Kafka vs Flink Streams.

    Stream processing and related pipelines are crucial tasks in modern industry domains and disciplines such as Telecom…

  • Microservices and Lambda as microservice.

    Microservices and Lambda as microservice.

    My intuitive experience in the microservice concept as an intention to bring SOLID to the service level began a long…

    2 条评论