Implementation of Saga orchestration using MassTransit

Implementation of Saga orchestration using MassTransit

n this article, we will explore the Saga orchestration pattern and demonstrate how it works through a simple scenario. Previously, MassTransit covered how to send a message using MassTransit and RabbitMQ and provided code on GitHub. If you have any questions, please feel free to visit my website and contact me. Let's begin!

What is Saga?

Based on the Microsoft definition,?the Saga design pattern is a way to manage data consistency across microservices in distributed transaction scenarios. A saga is a sequence of transactions that updates each service and publishes a message or event to trigger the next transaction step. If a step fails, the saga executes compensating transactions that counteract the preceding transactions. (1)


If one of the services rejects the incoming messages that have already been stored in the producer database, the producer will remove the incoming messages. As is well-known, in monolithic services, all transactions are?ACID-compliant (Atomicity, Consistency, Isolation, Durability). However, in microservices, ACID is one of the mandatory sections that should be implemented accurately. The Saga pattern is a technique that could help us handle data consistency between different databases.

The Saga pattern could be implemented in two ways:?Choreography?and?Orchestration. So, let’s explore these two processes and determine which one of them could be used based on our requirements.

Choreography

In choreography, each service performs its transaction and directly sends messages to other services without centralization for controlling all incoming and outgoing messages. For instance, if Service A attempts to store and send a message to Service B, but Service B rejects the incoming message, Service B will notify Service A directly, prompting Service A to perform a specific function.


Choreography is well-suited for simple services that do not require coordination or do not work with a database, such as Email Services, SMS services, etc., because as the number of services increases, managing consistency becomes more challenging.

Fig 01 — Saga Choreography
Fig 01 — Saga Choreography

Orchestration

In this type, the?orchestrator?handles all incoming and outgoing messages and determines which operation should be performed based on the incoming?event. It is particularly suitable for complex transactions involving multiple services, and it excels at ensuring data consistency, which is a critical aspect of a transaction.

Fig 02— Saga Orchestration
Fig 02— Saga Orchestration

Now, let’s find out how?Masstransit?aids in implementing the?saga pattern. In the previous article, MassTransit was covered, explaining how it works in a microservices architecture and highlighting that Masstransit is the easiest way to work with saga patterns. (2)

MassTransit and Saga

One of the great features of MassTransit is supporting saga orchestration and making the implementation process easier. It is compatible with almost all of the databases and message brokers and could be configured easily. based on the MassTransit documentation explanation:


A saga is a long-lived transaction managed by a coordinator. Sagas are initiated by an event, sagas orchestrate events, and sagas maintain the state of the overall transaction. Sagas are designed to manage the complexity of a distributed transaction without locking and immediate consistency. They manage state and track any compensations required if a partial failure occurs. (3)


Before starting, let’s talk about the scenario that is going to implement since it makes clearer the process that is going to happen.

Scenario

In this scenario, three services are going to be used to illustrate the process of performing the saga pattern and managing the consistency of data. The services are:


  • TicketService: Responsible for registering a ticket
  • GenerateTicketService: Responsible for generating a ticket
  • EmailService: Responsible for sending ticket information to the user (to keep the example simple,?logging?information will be used)

Fig 03 — Saga Orchestration in the scenario
Fig 03 — Saga Orchestration in the scenario

Based on?Fig 03, a client sends a request to?TicketService?to register a ticket. This triggers an?AddTicketEvent?which sends a message to?GenerateTicketService. If the message has valid values,?GenerateTicketService?sends a message to?EmailService?to send a confirmation email to the client that includes all ticket information. If either?GenerateTicketService?or?EmailService?receives an invalid message, a?Cancel event?occurs, and all incoming data is removed from their databases.

Prerequires

  • C# and .NET Core
  • Visual Studio or VSCode
  • Entity Framework core or any other ORMs
  • SQL or NoSQL databases
  • MassTransit
  • RabbitMQ
  • Docker
  • Postman

After the above basic explanation, let’s move forward to the implementation of Saga. So, let’s get started coding.

Note: Saga could be implemented using InMemory or Databases. In this example, databases would be used.

Step 1

There are four.NET API?projects and three?class libraries?in this scenario as you can see in Figure 4.

Fig 04 — API projects and Class Libraries
Fig 04 — API projects and Class Libraries

The?MessageBrokers?class library is responsible for all message brokers that are going to be used in our services. Here,?RabbitMQ?is our choice to make communication between our services.

First of all, add the three below classes to the?MessageBrokers?class library.

RabbitMQQueues:?It is responsible for storing queues in a const variable. So, add the following code.


public class RabbitMQQueues
{
    public const string SagaBusQueue = "Saga-Queue";
}        

RabbitMQConfig:?It?is responsible for storing the configuration of RabbitMQ in a const variable.

public class RabbitMQConfig
{
    public const string RabbitMQURL = "rabbitmq://localhost/";
    public const string UserName = "guest";
    public const string Password = "guest";
}        

RabbitMQ:?It?makes a configuration bus of RabbitMQ to use in all of the services that exist in the service.

public class RabbitMQ
{
    public static IBusControl ConfigureBus(
        IServiceProvider serviceProvider, Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> action = null)
    {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
           cfg.Host(new Uri(RabbitMQConfig.RabbitMQURL), hst =>
            {
                hst.Username(RabbitMQConfig.UserName);
                hst.Password(RabbitMQConfig.Password);
            });
              
            cfg.ConfigureEndpoints(serviceProvider.GetRequiredService<IBusRegistrationContext>());
        });
    }
}        

Note:?Each class library could be generated as a NuGet package.

Another class library that will be implemented is?Events. Here, all the events that will occur in this example exist. It can be added to each one of the projects directly. But to have clean code, it would be better to split everything.

Step 2

In this step, the?Event?class library will be implemented. So, based on?Figure 3, in this scenario, five events will be required. It could be implemented by more than 5 events. first, add a folder and call it?TicketEvents?and add the following classes to this folder.


IAddTicketEvent: This event will be published when a ticket is created.

public interface IAddTicketEvent
{
    public Guid TicketId { get; }
    public string Title { get; }
    public string Email { get; }
    public DateTime RequireDate { get; }
    public int Age { get; }
    public string Location { get; }
    public string TicketNumber { get; }
}        

IGenerateTicketEvent:?This event will be published when ticket information such as ticket number is generated.

public interface IGenerateTicketEvent
{
    public Guid TicketId { get; }
    public string Title { get; }
    public string Email { get; }
    public DateTime RequireDate { get; }
    public int Age { get; }
    public string Location { get; }
    public string TicketNumber { get; }
}        

ICancelGenerateTicketEvent: This event will be published when generating ticket is canceled due to the age condition which in here?age?should not be?more than 80.

public interface ICancelGenerateTicketEvent
{
    public Guid TicketId { get; }
    public string Title { get; }
    public string Email { get; }
    public DateTime RequireDate { get; }
    public int Age { get; }
    public string Location { get; }
    public string TicketNumber { get; }
}        

IGETValueEvent:?This class will be published when a client sends a request to the?TicketService. While a request is sent, this event will occur and publish?IAddTicketEvent?to the?saga machine.

// This event is not going to use in the State machine 
// It will used in the first service which here is TicketService
public interface IGETValueEvent
{
    public Guid TicketId { get; }
    public string Title { get; }
    public string Email { get; }
    public DateTime RequireDate { get; }
    public int Age { get; }
    public string Location { get; }
    public string TicketNumber { get; }
}        

Now, add another folder, call it?SendEmailEvents,?and create the following classes.

ISendEmailEvent:?This event will be published when ticket information is generated. If the logic of age is less than 80, this event will publish.

public interface ISendEmailEvent
{
    public Guid TicketId { get; }
    public string Title { get; }
    public string Email { get; }
    public DateTime RequireDate { get; }
    public int Age { get; }
    public string Location { get; }
    public string TicketNumber { get; }
}        

ICancelSendEmailEvent:?This event will be published when sending email logic failed. In this scenario, the location should not be?London. So, if the location is?London, this event will publish.

public interface ICancelSendEmailEvent
{
    public Guid TicketId { get; }
    public string Title { get; }
    public string Email { get; }
    public DateTime RequireDate { get; }
    public int Age { get; }
    public string Location { get; }
    public string TicketNumber { get; }
}        

Step 3

Here, the?TicketService?should be implemented. So, a couple of the?NuGet packages?will be required.


  • Install-Package Microsoft.EntityFrameworkCore
  • Install-Package Microsoft.EntityFrameworkCore.Design
  • Install-Package Microsoft.EntityFrameworkCore.SqlServer
  • Install-Package Microsoft.EntityFrameworkCore.Tools
  • Install-Package AutoMapper -Version 12.0.1
  • Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection

Now, the?Events?and?MessageBrokers?class libraries should be added to the references of?TicketService. Add the following code in the?terminal?(CTRL + `)?of?visual studio?or?VSCode.

dotnet add TicketService reference Events/Events.csproj

dotnet add TicketService reference MessageBrokers/ MessageBrokers.csproj

Furthermore, to add these class libraries to the other services like the same one that has been done before, use the below command.

dotnet add GenerateTicket reference Events/Events.csproj

dotnet add GenerateTicket reference MessageBrokers/ MessageBrokers.csproj

dotnet add EmailService reference Events/Events.csproj

dotnet add EmailService reference MessageBrokers/ MessageBrokers.csproj

dotnet add SagaStateMachine reference Events/Events.csproj

dotnet add SagaService reference Events/Events.csproj

Now, the model should be added to the?TicketService. So, add a folder and call it?Models?and create a class and call it?Ticket?and add the following properties.

public class Ticket
{
    public string TicketId { get; set; }
    public string Title { get; set; }
    public string Email { get; set; }
    public DateTime RequireDate { get; set; }
    public int Age { get; set; }
    public string Location { get; set; }
    public string TicketNumber { get; set; }
    public DateTime CreatedDate { get; set; }
}        

Now, create the?DbContext?class in the?Models?folder.

public class AppDbContext : DbContext
{
    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
    {

    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
       base.OnModelCreating(modelBuilder);
    }

    public DbSet<Ticket> Ticket { get; set; }
}        

In the?appsettings.json?file, add a?connection string.

"ConnectionStrings": {
    "DbConnection" : "server=.;database=Ticket_SagaMedium;Trusted_Connection=true"
  }        

Then register the?AppDbContext?file to the?program.cs?file.

using Microsoft.EntityFrameworkCore;
using TicketService.Models;

// Connection string
var connectionString = builder.Configuration.GetConnectionString("DbConnection");

// Register AppDbContext
builder.Services.AddDbContextPool<AppDbContext>(db => db.UseSqlServer(connectionString));        

Then use?migration?commands.

Add-Migration TicketModel

Update-Database

Now, a?DTO?object will be required to get the data from the client. So, add a new folder and call it?DTO?and create a new class and call it?AddTicketDTO.

using System.ComponentModel.DataAnnotations;

public class AddTicketDTO
{
    public string TicketId { get; set; } = Guid.NewGuid().ToString();   
    public string Title { get; set; }
    [Required]
    public string Email { get; set; }
    public DateTime RequireDate { get; set; }
    public int Age { get; set; }
    public string Location { get; set; }
    public DateTime CreatedDate { get; set; } = DateTime.Now;
}        

Moreover, to?pass the response to the client?or?send a specific object to the bus, create another class in the?DTO?folder and call it?ResponseTicketDTO?and add the following properties to it.

public class ResponseTicketDTO
{
    public string TicketId { get; set; }
    public string Title { get; set; }
    public string Email { get; set; }
    public DateTime RequireDate { get; set; }
    public int Age { get; set; }
    public string Location { get; set; }
    public string TicketNumber { get; set; }
    public DateTime CreatedDate { get; set; }
}        

Now, mapping configuration is required. So, add a folder in the root of the project and call it?Common?and add another folder in it and call it?Mapping,?then create a class and call it?TicketMapping?and add the following code to it.

using AutoMapper;
using TicketService.DTO;
using TicketService.Models;

public class TicketMapping : Profile
{
    public TicketMapping()
    {
        CreateMap<AddTicketDTO, Ticket>();
         CreateMap<Ticket, ResponseTicketDTO>();
    }
}        

Then, register?AutoMapper?in the?Program.cs?file.

// Register AutoMapper
builder.Services.AddAutoMapper(AppDomain.CurrentDomain.GetAssemblies());        

Now, the service that is responsible for adding a ticket should be created. So, in the root of the project, add a folder and call it?Services?and add an interface and call it?ITicketServices?and add the following code in it.

public interface ITicketServices
{
    Task<Ticket> AddTicket(Ticket ticket);
    bool DeleteTicket(string TicketId);

    // Other methods like Update could be implemented 
}        

Then, the implementation of the interfaces should be added. So, create another call in this folder and create a class and call it?TicketServices?and add the following code.

public class TicketServices : ITicketServices
{
    private readonly AppDbContext _dbContext;

    public TicketServices(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async Task<Ticket> AddTicket(Ticket ticket)
    {
        if(ticket is not null)
        {
            await _dbContext.Ticket.AddAsync(ticket);
            await _dbContext.SaveChangesAsync();
        }
        return ticket;
    }

    public bool DeleteTicket(string TicketId)
    {
        var ticketObj = _dbContext.Ticket.FirstOrDefault(t=>t.TicketId == TicketId);
        if(ticketObj is not null)
        {
            _dbContext.Ticket.Remove(ticketObj);
            _dbContext.SaveChanges();
            return true;
        }
        return false;
    }
}        

Note:?If need more information about the repository pattern, visit the previous articles about the repository (4)

Now register this?TicketService?in the?Program.cs?file.

// Register Ticket Service
builder.Services.AddScoped<ITicketServices, TicketServices>();        

Now, add a controller to make the clients able to send a request to the endpoint to add a new ticket. So, create a new controller and call it TicketController and add the following code.

[Route("api/[controller]")]
[ApiController]
public class TicketController : ControllerBase
{
    private readonly ITicketServices _ticketServices;
    private readonly IMapper _mapper;
    private readonly IBus _bus;

    public TicketController(ITicketServices ticketServices, IMapper mapper, IBus bus)
    {
        _ticketServices = ticketServices;
        _mapper = mapper;
        _bus = bus;
    }

    [HttpPost]
    public async Task<IActionResult> Post(AddTicketDTO addTicketDTO)
    {
        var mapModel = _mapper.Map<Ticket>(addTicketDTO);

        var res = await _ticketServices.AddTicket(mapModel);

        if (res is not null)
        {
            // map model to the DTO and pass the DTO object to the bus queue
            var mapResult = _mapper.Map<ResponseTicketDTO>(res);
            // Send to the Bus
            var endPoint = await _bus.GetSendEndpoint(new Uri("queue:" + MessageBrokers.RabbitMQQueues.SagaBusQueue));
            await endPoint.Send<IGETValueEvent>(new
            {
                TicketId = Guid.Parse(mapResult.TicketId),
                Title = mapResult.Title,
                Email = mapResult.Email,
                RequireDate = mapResult.RequireDate,
                Age = mapResult.Age,
                Location = mapResult.Location
            });
            return StatusCode(StatusCodes.Status201Created);
        }
        return BadRequest();
     }
}        

As shown in the?TicketController,?IBus?has been used to publish a message. To learn more about the types of Communication,?visit the previous article ?or?visit the document of MassTransit (5)

Now, register?MassTransit?and its?configuration?in the?Program.cs?file.

using MassTransit;

// Registe MassTransit
builder.Services.AddMassTransit(cfg =>
{
    cfg.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(MessageBrokers.RabbitMQQueues.SagaBusQueue, ep =>
        {
            ep.PrefetchCount = 10;
            // Get Consumer
            ep.ConfigureConsumer<GetValueConsumer>(provider);
        });
    }));

    cfg.AddConsumer<GetValueConsumer>();
});        

Now, add a folder in the root of the project and call it?consumer?and create a new class and call it?GetValueConsumer?and add the following code.

using Events.TicketEvents;
using MassTransit;

public class GetValueConsumer : IConsumer<IGETValueEvent>
{
    private readonly ILogger<GetValueConsumer> _logger;
    public GetValueConsumer(ILogger<GetValueConsumer> logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<IGETValueEvent> context)
    {
        var data = context.Message;
        if (data is not null)
        {
            // This section will publish message to the IAddTicketEvent although the GenerateTicket service has a consumer
            // that it will be listened on the IAddTicketEvent
            await context.Publish<IAddTicketEvent>(new
            {
                TicketId = data.TicketId,
                Title = data.Title,
                Email = data.Email,
                RequireDate = data.RequireDate,
                Age = data.Age,
                Location = data.Location
            });
            _logger.LogInformation("a message has been received");
        }
    }
}        

In the next step,?GenerateTicket?will be implemented.

Note:?The Saga machine will be implemented after this step since this article wants to show the type of orchestration in the Saga state machine of MassTransit.

Step 4

In this step, in the?GenerateTicket?project, first, add some packages that will be required.


  • Install-Package Microsoft.EntityFrameworkCore
  • Install-Package Microsoft.EntityFrameworkCore.Design
  • Install-Package Microsoft.EntityFrameworkCore.SqlServer
  • Install-Package Microsoft.EntityFrameworkCore.Tools
  • Install-Package AutoMapper -Version 12.0.1
  • Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection

Now add a folder and call it?Models?and create a class and call it?TicketInfo?and add the following properties.

using System.ComponentModel.DataAnnotations;

public class TicketInfo
{
    [Key]
    public string TicketId { get; set; }
    public string Email { get; set; }
    public string TicketNumber { get; set; }
    public DateTime CreatedDate { get; set; } = DateTime.Now;
}        

Now, create another class in the?Models?folder and call it?AppDbContext?and add the following code.

using Microsoft.EntityFrameworkCore;

public class AppDbContext : DbContext
{
    public AppDbContext(DbContextOptions<AppDbContext> options): base(options)
    {
                
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
    }
    public DbSet<TicketInfo> TicketInfo { get; set; }
}        

Now, in the?appsettings.json?file, add the?connection string.

"ConnectionStrings": {
    "DbConnection": "server=.;database=TicketInfo_SagaMedium;Trusted_Connection=true"
  }        

Now, in the?Program.cs?file, register?AppDbContext.

using GenerateTicket.Models;
using Microsoft.EntityFrameworkCore;

// Connection string
var connectionString = builder.Configuration.GetConnectionString("DbConnection");

// Register AppDbContext
builder.Services.AddDbContextPool<AppDbContext>(db => db.UseSqlServer(connectionString));        

Then use migration commands.

Add-Migration TicketInfo

Update-Database

Now, add a folder in the root of the project and call it?Service?and create an interface and call it?ITicketInfoService?and add the following code.

public interface ITicketInfoService
{
    Task<TicketInfo> AddTicketInfo(TicketInfo ticketInfo);
    bool RemoveTicketInfo(string TicketId);
}        

Now, add the implementation of the?ITicketInfoService?interface. So, create a class in this folder and call it?TicketInfoService?and add the following code.

public class TicketInfoService : ITicketInfoService
{
    private readonly AppDbContext _dbContext;

    public TicketInfoService(AppDbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async Task<TicketInfo> AddTicketInfo(TicketInfo ticketInfo)
    {
        if(ticketInfo is not null)
        {
            ticketInfo.TicketNumber = StringGenerator.Generate();
            await _dbContext.TicketInfo.AddAsync(ticketInfo);
            await _dbContext.SaveChangesAsync();
        }
        return ticketInfo;
    }

    public bool RemoveTicketInfo(string TicketId)
    {
        var ticketInfoObj = _dbContext.TicketInfo.FirstOrDefault(t => t.TicketId == TicketId);
        if(ticketInfoObj is not null)
        {
            _dbContext.TicketInfo.Remove(ticketInfoObj);
            _dbContext.SaveChanges();
            return true;
        }
        return false;
    }
}        

As shown in the?TicketInfoService, a method has been used to generate a random string as a ticket number. So, add a folder in the root of the project and call it?Common?and create a class and call it?StringGenerator.cs,?and add the following code.

public static class StringGenerator
{
    public static string Generate()
    {
        var random = new Random();
        string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
        string randomString = new string(Enumerable.Repeat(chars, 10)
                                      .Select(s => s[random.Next(s.Length)]).ToArray());
        return randomString;
    }
}        

Now register this service in the?Program.cs?file.

// Register TicketInfo service
builder.Services.AddScoped<ITicketInfoService, TicketInfoService>();        

Now, in the?Common?folder, add another folder and call it?mapping, then create a new class and call it?TicketInfoMapping?and add the following code.

using AutoMapper;
using Events.TicketEvents;
using GenerateTicket.Models;

public class TicketInfoMapping : Profile
{
    public TicketInfoMapping()
    {
        CreateMap<IGenerateTicketEvent, TicketInfo>();
    }
}        

Now register?AutoMapper?in the?Program.cs?file.

// Register AutoMapper
builder.Services.AddAutoMapper(AppDomain.CurrentDomain.GetAssemblies());        

Here, to?retrieve a message?from the?bus, a?consumer?will be required. So, in the root of the project, add a new folder and call it?Consumer?and create a new class and call it?GenerateTicketConsumer?and add the following code.

public class GenerateTicketConsumer : IConsumer<IGenerateTicketEvent>
{
    // As shown, this consumer is listening to the IGenerateTicketEvent
    // But, Ticket Service publish its message to the IAddTicketEvent
    // Here State machine will transform IAddTicketEvent to the IGenerateTicketEvent 
    private readonly ITicketInfoService _ticketInfoService;
    private readonly ILogger<GenerateTicketConsumer> _logger;
    private readonly IMapper _mapper;
    public GenerateTicketConsumer(ITicketInfoService ticketInfoService, ILogger<GenerateTicketConsumer> logger, IMapper mapper)
    {
        _ticketInfoService = ticketInfoService;
        _logger = logger;
        _mapper = mapper;
    }
    public async Task Consume(ConsumeContext<IGenerateTicketEvent> context)
    {
        var data = context.Message;

        if (data is not null)
        {
            // Check if Age is 80 or less
            if (data.Age < 80)
            {
                // Store message
                // Use Mapper or use a ticketinfo object directly
                var mapModel = _mapper.Map<TicketInfo>(data);
                var res = await _ticketInfoService.AddTicketInfo(mapModel);
                if (res is not null)
                {
                    await context.Publish<ISendEmailEvent>(new
                    {
                        TicketId = data.TicketId,
                        Title = data.Title,
                        Email = data.Email,
                        RequireDate = data.RequireDate,
                        Age = data.Age,
                        Location = data.Location,
                        TicketNumber = res.TicketNumber
                    });
                    _logger.LogInformation($"Message sent == TicketId is {data.TicketId}");
                }
            }
            else
            {
                // This section will return the message to the Cancel Event
                await context.Publish<ICancelGenerateTicketEvent>(new
                {
                    TicketId = data.TicketId,
                    Title = data.Title,
                    Email = data.Email,
                    RequireDate = data.RequireDate,
                    Age = data.Age,
                    Location = data.Location
                });
                _logger.LogInformation($"Message canceled== TicketId is {data.TicketId}");
            }
        }
    }
}        

As shown in the above consumer, this class listening?IGenerateTicketEvent?and when this event occurred, this class retrieve the message. Now, as demonstrated in Figure 3, the cancel event is when the?GeneratedTicket?service denies an incoming message that the age is more than 80. So, a consumer in the?TicketService?project will be required when the cancel event occurred.

So, in the?TicketService?and the?Consumers?folder, create another class and call it?GenerateTicketCancelConsumer?and add the following code to retrieve a message when the cancel event has occurred.

public class GenerateTicketCancelConsumer : IConsumer<ICancelGenerateTicketEvent>
{
    private readonly ITicketServices _ticketServices;
    private readonly ILogger<GenerateTicketCancelConsumer> _logger;

    public GenerateTicketCancelConsumer(ITicketServices ticketServices, ILogger<GenerateTicketCancelConsumer> logger)
    {
        _ticketServices = ticketServices;
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<ICancelGenerateTicketEvent> context)
    {
        var data = context.Message;
        if(data is not null)
        {
            var res = _ticketServices.DeleteTicket(data.TicketId.ToString());
            if(res is true)
            {
                _logger.LogInformation("The Ticket has been removed successufully");
            }
            else
            {
                _logger.LogInformation("Failed!!!");
            }
        }
    }
}        

Now, register?GenerateTicketCancelConsumer?in the?Program.cs?file.?So, the whole section is:

// Registe MassTransit
builder.Services.AddMassTransit(cfg =>
{
    cfg.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(MessageBrokers.RabbitMQQueues.SagaBusQueue, ep =>
        {
            ep.PrefetchCount = 10;
            // Get Consumer
            ep.ConfigureConsumer<GetValueConsumer>(provider);
            // Cancel Consumer
            ep.ConfigureConsumer<GenerateTicketCancelConsumer>(provider);
        });
    }));

    cfg.AddConsumer<GetValueConsumer>();
    cfg.AddConsumer<GenerateTicketCancelConsumer>();
});        

As shown in the?GenerateTicketConsumer?file, this consumer is listening to the?IGenerateTicketEvent,?however, when a ticket is created,?IAddTicketEvent?occurred. So, The?IAddTicketEvent is?transformed by the?saga state machine.?Furthermore, it will be sent a message to the?ISendEmailEvent?when the age is less than 80.

Now, register the?MassTransit?and its?configuration?in the?Program.cs?file in the?GenerateTicket?project.

using GenerateTicket.Consumers;
using MassTransit;

// Register MassTransit 
builder.Services.AddMassTransit(cfg =>
{
    cfg.AddBus(provider => MessageBrokers.RabbitMQ.ConfigureBus(provider));
    cfg.AddConsumer<GenerateTicketConsumer>();
});        

In the next step, the email service will be implemented.

Step 5

To keep the scenario simple, there is no database in this service and just logs the information in the consumer. So, first of all, add a new folder in the root of the project and call it?Consumers,?then create a new class and call it?SendEmailConsumer?and add the following code.

using Events.SendEmailEvents;
using MassTransit;

public class SendEmailConsumer : IConsumer<ISendEmailEvent>
{
    private readonly ILogger<SendEmailConsumer> _logger;

    public SendEmailConsumer(ILogger<SendEmailConsumer> logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<ISendEmailEvent> context)
    {
        var data = context.Message;
        if (data is not null)
        {
            if (data.Location == "London")
            {
                await context.Publish<ICancelSendEmailEvent>(new{
                    TicketId = data.TicketId,
                    Title = data.Title,
                    Email = data.Email,
                    RequireDate = data.RequireDate,
                    Age = data.Age,
                    Location = data.Location
                });
                _logger.LogInformation("The location is unavailable");
            }
            else
            {
                _logger.LogInformation("The message has been received ");
            }
        }
    }
}        

Now register this consumer and?MassTransit configuration?in the?Program.cs?file.

using EmailService.Consumers;
using MassTransit;


// Register MassTransit 
builder.Services.AddMassTransit(cfg =>
{
    cfg.AddBus(provider => MessageBrokers.RabbitMQ.ConfigureBus(provider));
    cfg.AddConsumer<SendEmailConsumer>();
});        

As shown in the?SendEmailConsumer,?if the location is equal to?London?an?ICancelSendEmailEvent?will occur. So, a consumer will be required in the?GenerateTicket?project to consume the cancelation of sending email and remove its data like the TicketService. Therefore, in the?GenerateTicket?project and the?Consumers?folder, create a new class and call it?CancelSendingEmailConsumer?and add the following code.

To achieve this, in the GenerateTicket project within the Consumers folder, create a new class and name it CancelSendingEmailConsumer. Then, add the following code:

using Events.SendEmailEvents;
using Events.TicketEvents;
using GenerateTicket.Services;
using MassTransit;

public class CancelSendingEmailConsumer : IConsumer<ICancelSendEmailEvent>
{
    private readonly ITicketInfoService _ticketInfoService;
    private readonly ILogger<CancelSendingEmailConsumer> _logger;
    public CancelSendingEmailConsumer(ITicketInfoService ticketInfoService, ILogger<CancelSendingEmailConsumer> logger)
    {
        _ticketInfoService = ticketInfoService;
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<ICancelSendEmailEvent> context)
    {
        var data = context.Message;
        if(data is not null)
        {
            var res = _ticketInfoService.RemoveTicketInfo(data.TicketId.ToString());
            if(res is true)
            {
                await context.Publish<ICancelGenerateTicketEvent>(new
                {
                    TicketId = data.TicketId,
                    Title = data.Title,
                    Email = data.Email,
                    RequireDate = data.RequireDate,
                    Age = data.Age,
                    Location = data.Location
                });
                _logger.LogInformation("The message has been sent to the ICancelGenerateTicketEvent in the TicketService");
            }
            else
            {
                _logger.LogInformation("Failed!!!");
            }
        }
    }
}        

Now, in the?Program.cs?file that exists in the?GenerateTicket?project, add?CancelSendingEmailConsumer?Consumer.

The whole file is :

using GenerateTicket.Consumers;
using MassTransit;

// Register MassTransit 
builder.Services.AddMassTransit(cfg =>
{
    cfg.AddBus(provider => MessageBrokers.RabbitMQ.ConfigureBus(provider));
    cfg.AddConsumer<GenerateTicketConsumer>();
    cfg.AddConsumer<CancelSendingEmailConsumer>();
});        

In the upcoming step, the Saga state machine and the Saga service will be implemented.

Step 6

Now, the Saga state machine will be needed to transform the incoming and outgoing messages into other events. So, first of all, in the?SagaStateMachine?class library, add a class and call it?TicketStateData?and add the following properties.

using MassTransit;

public class TicketStateData : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public DateTime TicketCreatedDate { get; set; }
    public DateTime TicketCancelDate { get; set; }
    public Guid TicketId { get; set; }
    public string Title { get; set; }
    public string Email { get; set; }
    public string Location { get; set; }
    public int Age { get; set; }
    public  string TicketNumber { get; set; }
}        

As evident from the above class, this class represents all the data that is required to determine the state of the request received by the SagaStateMachine. The?CorrelationId?is mandatory property acting as the key value of the data.

Create another class and call it?GenerateTicketEvent?and add the following code.

using Events.TicketEvents;

public class GenerateTicketEvent : IGenerateTicketEvent
{
    private readonly TicketStateData _ticketStateData;
    public GenerateTicketEvent(TicketStateData ticketStateData)
    {
        _ticketStateData = ticketStateData;
    }
    public Guid TicketId => _ticketStateData.TicketId;

    public string Title => _ticketStateData.Title;

    public string Email => _ticketStateData.Email;

    public DateTime RequireDate => _ticketStateData.TicketCreatedDate;

    public int Age => _ticketStateData.Age;

    public string Location => _ticketStateData.Location;

    public string TicketNumber => _ticketStateData.TicketNumber;
}        

As shown in the above class, this class inherits from?IGenerateTicketEvent.

Now, the state machine should be implemented. Create another class and call it?TicketStateMachine?and add the following code.

using Events.SendEmailEvents;
using Events.TicketEvents;
using MassTransit;

public class TicketStateMachine : MassTransitStateMachine<TicketStateData>
{
    // 4 states are going to happen
    public State AddTicket { get; private set; }
    public State CancelTicket { get; private set; }
    public State CancelSendEmail { get; private set; }
    public State SendEmail { get; private set; }

    // 4 events are going to happen

    public Event<IAddTicketEvent> AddTicketEvent { get; private set; }
    public Event<ICancelGenerateTicketEvent> CancelGenerateTicketEvent { get; private set; }
    public Event<ICancelSendEmailEvent> CancelSendEmailEvent { get; private set; }
    public Event<ISendEmailEvent> SendEmailEvent { get; private set; }

    public TicketStateMachine()
    {
        InstanceState(s => s.CurrentState);
        Event(() => AddTicketEvent, a => a.CorrelateById(m => m.Message.TicketId));
        Event(() => CancelGenerateTicketEvent, a => a.CorrelateById(m => m.Message.TicketId));
        Event(() => CancelSendEmailEvent, a => a.CorrelateById(m => m.Message.TicketId));
        Event(() => SendEmailEvent, a => a.CorrelateById(m => m.Message.TicketId));

        // A message comming from ticket service 
        // it could be the initially state
        Initially(
            When(AddTicketEvent).Then(context =>
            {
                context.Saga.TicketId = context.Message.TicketId;
                context.Saga.Title = context.Message.Title;
                context.Saga.Email = context.Message.Email;
                context.Saga.TicketNumber = context.Message.TicketNumber;
                context.Saga.Age = context.Message.Age;
                context.Saga.Location = context.Message.Location;
            }).TransitionTo(AddTicket).Publish(context => new GenerateTicketEvent(context.Saga)));
        // During AddTicketEvent some other events might occured 
        During(AddTicket, 
           When(SendEmailEvent)
           .Then(context =>
            {
                // These values could be different 
                context.Saga.TicketId = context.Message.TicketId;
                context.Saga.Title = context.Message.Title;
                context.Saga.Email = context.Message.Email;
                context.Saga.TicketNumber = context.Message.TicketNumber;
                context.Saga.Age = context.Message.Age;
                context.Saga.Location = context.Message.Location;
            }).TransitionTo(SendEmail));
         During(AddTicket,
            When(CancelGenerateTicketEvent)
            .Then(context =>
            {
                // These values could be different 
                context.Saga.TicketId = context.Message.TicketId;
                context.Saga.Title = context.Message.Title;
                context.Saga.Email = context.Message.Email;
                context.Saga.TicketNumber = context.Message.TicketNumber;
                context.Saga.Age = context.Message.Age;
                context.Saga.Location = context.Message.Location;
            }).TransitionTo(CancelTicket));
       // During SendEmailEvent some other events might occured 
       During(SendEmail,
            When(CancelSendEmailEvent)
            .Then(context =>
            {
                // These values could be different 
                context.Saga.TicketId = context.Message.TicketId;
                context.Saga.Title = context.Message.Title;
                context.Saga.Email = context.Message.Email;
                context.Saga.TicketNumber = context.Message.TicketNumber;
                context.Saga.Age = context.Message.Age;
                context.Saga.Location = context.Message.Location;
            }).TransitionTo(CancelSendEmail));
    }
}        

As shown in the above class, four states represent all states that might be required in this incoming request. It could be more than four states, but to keep the scenario simple, just use four states. Furthermore, all interfaces that are created as events should be defined with the Event type of MassTransit in the StateMachine.

As indicated in the ‘Initially’ section, when ‘AddTicket,’ which is an ‘IAddTicketEvent,’ occurs, it is transformed into the ‘GeneratedTicketEvent’ class, which inherits from ‘IGenerateTicketEvent.’ Now, the message that comes from ‘IAddTicketEvent’ is transformed into ‘IGenerateTicketEvent.’ The ‘Transition’ method represents the?current state of a request, and this value will be stored in the database table.

Furthermore, in the ‘During’ section, two events could be published. The ‘Publish’ method has not been used in the ‘During’ section. However, if the ‘Publish’ method is to be used, a class like ‘GenerateTicketEvent’ should be created to receive the incoming messages and pass them to another event.

In the next step, the?SagaService?will be implemented.

Step 7

Now, this service, should be run as a state machine that receives incoming messages and pass them to other services. So, in the?SagaServices, first, add a new folder in the root of the project and call it?Models?and create a class and call it?AppDbContext?and add the following code.

using Microsoft.EntityFrameworkCore;
using SagaStateMachine;

public class AppDbContext : DbContext
    {
       
        public AppDbContext(DbContextOptions<AppDbContext> options): base(options)
        {
        }
        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            modelBuilder.Entity<TicketStateData>().HasKey(x => x.CorrelationId);
            base.OnModelCreating(modelBuilder);
        }
        public DbSet<TicketStateData> TicketStateData{ get;set; }
    }        

As shown in the above class, this class wants to create a table in the database based on the?TicketStateData.

Now, in the?appsettings.json?file, add the?connection string.

"ConnectionStrings": {
    "DbConnection": "server=.;database=StateSaga_SagaMedium;Trusted_Connection=true"
  }        

Then, register?AppDbContext?in the?Program.cs?file.

using Microsoft.EntityFrameworkCore;
using SagaService.Models;

var connectionString = builder.Configuration.GetConnectionString("DbConnection");


// Register SagaContext
builder.Services.AddDbContextPool<AppDbContext>(db => db.UseSqlServer(connectionString));        

Now, use migration commands to create a table to store the?TicketStateData?information.

Add-Migration TicketStateData

Update-Database

Now, register?MassTransit?and its?configuration?in the?Program.cs?file.

using MassTransit;
using SagaStateMachine;

builder.Services.AddMassTransit(cfg =>
{
    cfg.AddBus(provider=> MessageBrokers.RabbitMQ.ConfigureBus(provider));
    cfg.AddSagaStateMachine<TicketStateMachine, TicketStateData>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

            r.ExistingDbContext<AppDbContext>();
        });
});        

Finally, the?Saga machine?is ready to be tested.

Step 8

In this step, the?Saga machine?should be tested. So, run all services and send a request using?Postman.

Fig 05 — Send a request to generate a ticket
Fig 05 — Send a request to generate a ticket
Fig06 — Ticket Table contains a record that has been added
Fig06 — Ticket Table contains a record that has been added
Fig07 — TicketInfo table after consuming the message
Fig07 — TicketInfo table after consuming the message
Fig08 — TicketStateData table in each state will be updated
Fig08 — TicketStateData table in each state will be updated
Fig09 — TicketInfo table after reject by ISendEmailEvent remove this table’s record
Fig09 — TicketInfo table after reject by ISendEmailEvent remove this table’s record
Fig10 — Ticket table after reject by ICancelGenerateTicketEvent remove this table’s record
Fig10 — Ticket table after reject by ICancelGenerateTicketEvent remove this table’s record

Conclusion

Saga is a crucial pattern in microservices architecture and event-driven design, as it ensures consistency in databases, making it a mandatory component. Fortunately, MassTransit plays a significant role in simplifying and accurately implementing Saga orchestration.


References

(1)?https://learn.microsoft.com/en-us/azure/architecture/reference-architectures/saga/saga

(2)?https://blog.devops.dev/microservices-and-masstransit-6665fa3b3687

(3)?https://masstransit.io/documentation/patterns/saga

(4)?https://medium.com/@vahidalizadeh1990/crud-operation-by-repository-pattern-using-net-6-ef-core-sql-server-mysql-mongodb-part-1-afa35f3d35a7

(5)?https://masstransit.io/

GitHub

YouTube

https://youtu.be/dTI4FF7VogU

Medium



Tomasz Szymański

Senior .NET Developer

5 个月

Great article, does it work well with autoscaling mechanism like for example kubernetes creating more pods? In this case each pod will have an own instance of SagaStateMachine. Will the messages belonging to the particular saga be consumed correctly?

要查看或添加评论,请登录

社区洞察