Implementing Saga pattern in your .NET project using RabbitMQ and MongoDB
Implementing Saga pattern in your .NET project using RabbitMQ and MongoDB

Implementing Saga pattern in your .NET project using RabbitMQ and MongoDB

In a microservices architecture, multiple moving parts operate independently of each other within their processing contexts. Any failure within the process across distributed applications cannot be handled with a simple database transaction. This is where Saga comes to the rescue.

"A saga is a long-lived transaction managed by a coordinator. Sagas are initiated by an event, sagas orchestrate events, and sagas maintain the state of the overall transaction."

Either orchestration or choreography can achieve saga coordination. According to the original paper published in 1982 at the ACM SIGMOID conference, the saga pattern is described as follows.

SIGMOD '87: Proceedings of the 1987 ACM SIGMOD international conference on Management of data


In this article, I will create a simple Saga state machine along with its compensating transactions. The motivation for writing this article is my recent effort to understand how sagas work using MassTransit over the past few weeks, which led me to search for working examples. Although there are several materials online and some videos, I was having a hard time contemplating the end-to-end journey of Saga State Machines. Therefore, in this article, I will discuss the complete flow using MassTransit with RabbitMQ as the message queue. I will be using the example used in Milan Jovanovi?s videos on Sagas. Although there will be multiple services here (APIs and Workers) simulating a truly distributed architecture. Finally, there will be a Saga orchestrator tracking the states of the Saga state machine, and in the event of any failure, we will trigger a compensation transaction to update the database states and roll back the work done by previous transactions. Let's get into the implementation.

First, we need to set up our application for the Saga state machine with MassTransit and RabbitMQ. We will be using MongoDB as the Saga repository. Let's create the Saga machine. I have created a project (.csproj) where I have placed all the common classes, events, commands, and finally the state machine. Let's consider this scenario as an example.

We have a newsletter application where users register for 15 days trial and in return they are greeted with welcome emails, second a follow-up email is sent for a 15-day free policy. Finally, they are onboarded into the system where their profile is updated, and custom settings/preferences are applied, and saved into the database.

We need to create all the necessary commands and events for the problem stated.

namespace Shared.Entities
{
   /* Commands */

    public record SubscribeToNewsletter(string Email);

    public record SendWelcomeEmail(Guid SubsciberId, string Email);

    public record SendFollowUpEmail(Guid SubsciberId, string Email);

    public record FinalizeOnboarding(Guid SubsciberId, string Email);

    /* Revert commands */

    public record RevertSendWelcomeEmail(Guid SubsciberId, string Email);

    public record RevertSendFollowUpEmail(Guid SubsciberId, string Email);

    public record RevertOnboarding(Guid SubsciberId, string Email);

}        

Now let's create the events that we are going to dispatch when any command is executed.

namespace Shared.Entities
{
    public class SubscriberCreatedEvent
    {  
        public Guid SubscriberId { get; init; }
        public string Email { get; init; }
    }

    public class WelcomeEmailSent
    {
        public Guid SubscriberId { get; init; }
        public string Email { get; init; }
    }

    public class FollowUpEmailSent
    {
        public Guid SubscriberId { get; init; }
        public string Email { get; init; }
    }
    
    public class OnboardingCompleted
    {
        public Guid SubscriberId { get; init; }
        public string Email { get; init; }
    }

    public class JobCompleted
    {
        public Guid SubscriberId { get; init; }
        public string Email { get; init; }
    }
}        

When the users subscribe to the newsletter we will create an entry in the database. Let's create an entity named Subscriber.

    public class Subscriber
    {
        public Guid Id { get; set; }         
        public string Email { get; set; }
        public DateTime SubscribedOnUtc { get; set; }
    }        

Let's look at the project structure so far.

Figure: Entities project structure

Now we will move forward and create the Saga state machine using these events and commands. Initially, we need the create the SagaData entity which will be stored in the database and keep track of the running state of our state machine. During any failure, these records will be updated in the database accordingly.

using MassTransit;

namespace Shared.Sagas
{
    public class OnboardingSagaData : SagaStateMachineInstance, ISagaVersion
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }

        public Guid SubscriberId { get; set; }
        public string Email {get; set; } = string.Empty;

        public bool WelcomeEmailSent { get; set; }
        public bool FollowUpEmailSent { get; set; }
        public bool OnboardingCompleted { get; set; }

        public int Version { get; set; }
    }
}        

One important thing to notice here is the three flags WelcomeEmailSent, FollowUpEmailSent, OnboardingCompleted. Across the lifetime of our saga state management, we will be updating these flags in the database and when any internal failures occur the compensating transactions will revert the value of these flags. Now let's set up the state machine. At the very first step let's define the states of the state machine.

public class OnboardingSaga : MassTransitStateMachine<OnboardingSagaData>
{
    /* States */

    public State Welcoming { get; set; }
    public State WelcomeFaulted { get; set; }

    public State FollowingUp { get; set; }
    public State FollowingUpFaulted { get; set; }

    public State Onboarding { get; set; }
    public State OnboardingFaulted { get; set; }

    public State Finished { get; set; }
}        

When we are finished defining the states we have to define the events corresponding to the states.

public class OnboardingSaga : MassTransitStateMachine<OnboardingSagaData>
{
    /* Events */

    public Event<SubscriberCreatedEvent> SubscriberCreated { get; set; }
    public Event<Fault<SubscriberCreatedEvent>> SubscriberCreateFailed { get; set; }

    public Event<WelcomeEmailSent> WelcomeEmailSent { get; set; }
    public Event<Fault<WelcomeEmailSent>> WelcomeEmailFailed { get; set; }

    public Event<FollowUpEmailSent> FollowUpEmailSent { get; set; }
    public Event<Fault<FollowUpEmailSent>> FollowUpEmailFailed { get; set; }

    public Event<OnboardingCompleted> OnboardingDone { get; set; }
    public Event<Fault<OnboardingCompleted>> OnboardingFailed { get; set; }
    
    public Event<JobCompleted> End { get; set; }
}        

These are the events we will dispatch and listen to during stages of the application lifecycle. Event<T> defines the success and Event<Fault<T>> denotes the failure. When it's successful we go forward and do the next jobs and move the state machine to successive states. But during Fault<T> we do the compensation revert the state machine to the initial state and perform necessary rollbacks. Next, we have to configure the correlation of the events on the state machine.

public class OnboardingSaga : MassTransitStateMachine<OnboardingSagaData>
{
    public OnboardingSaga()
    {
        /* Configure correlation */

        InstanceState(x => x.CurrentState);

        Event(() => SubscriberCreated, e => e.CorrelateById(m => m.Message.SubscriberId));
        Event(() => SubscriberCreateFailed, e => e.CorrelateById(m => {
            return m.Message.Message.SubscriberId; 
        }));
        
        Event(() => WelcomeEmailSent, e => e.CorrelateById(m => m.Message.SubscriberId));
        Event(() => WelcomeEmailFailed, e => e.CorrelateById(m =>
        {
            return m.Message.Message.SubscriberId;
        }));
        
        Event(() => FollowUpEmailSent, e => e.CorrelateById(m => m.Message.SubscriberId));
        Event(() => FollowUpEmailFailed, e => e.CorrelateById(m => {
            return m.Message.Message.SubscriberId; 
        }));

        Event(() => OnboardingDone, e => e.CorrelateById(m => m.Message.SubscriberId));
        Event(() => OnboardingFailed, e => e.CorrelateById(m => { 
            return m.Message.Message.SubscriberId; 
        }));

        Event(() => End, e => e.CorrelateById(m => m.Message.SubscriberId));
    }
}        

When the correlation is configured the events when dispatched will be correctly handed over to the event-behavior delegate. Let's set up the transitions now.

public class OnboardingSaga : MassTransitStateMachine<OnboardingSagaData>
{
        Initially(
            When(SubscriberCreated)
                .Then(context =>
                {
                    context.Saga.SubscriberId = context.Message.SubscriberId;
                    context.Saga.Email = context.Message.Email;
                })
                .TransitionTo(Welcoming)
                .Publish(context => new SendWelcomeEmail(context.Message.SubscriberId, context.Message.Email))
            );

        During(Welcoming,
            When(WelcomeEmailSent)
                .Then(context =>
                {
                    context.Saga.WelcomeEmailSent = true;
                })
                .TransitionTo(FollowingUp)
                .Publish(context => new SendFollowUpEmail(context.Message.SubscriberId, context.Message.Email)));
        
        During(FollowingUp,
            When(FollowUpEmailSent)
                .Then(context =>
                {
                    context.Saga.FollowUpEmailSent = true;
                    context.Saga.OnboardingCompleted = true;
                })
                .TransitionTo(Onboarding)
                .Publish(context => new FinalizeOnboarding(context.Message.SubscriberId, context.Message.Email)));

        During(Onboarding,
            When(End)
                .Finalize());
    }
}        

This syntax comes from Automatonymous now known as Saga State Machines which lets you configure the state machine and define which state should trigger what action and how should you move forward to the next state. For example, here you might notice that Initially denotes the start of the state transition, and Finalize marks the end. Lastly, we have to configure the compensation states.

namespace Shared.Sagas
{
    public class OnboardingSaga : MassTransitStateMachine<OnboardingSagaData>
    {
        public OnboardingSaga()
        {
            DuringAny(
                When(OnboardingFailed)
                    .TransitionTo(OnboardingFaulted)
                    .Then(ctx =>
                    {
                        ctx.Saga.OnboardingCompleted = false;
                        ctx.Publish(new RevertOnboarding(ctx.Message.Message.SubscriberId, ctx.Message.Message.Email));
                    }));

            DuringAny(
                When(FollowUpEmailFailed)
                    .TransitionTo(FollowingUpFaulted)
                    .Then(ctx =>
                    {
                        ctx.Saga.FollowUpEmailSent = false;
                        ctx.Publish(new RevertSendFollowUpEmail(ctx.Message.Message.SubscriberId, ctx.Message.Message.Email));
                    }));

            DuringAny(
                When(WelcomeEmailFailed)
                    .TransitionTo(WelcomeFaulted)
                    .Then(ctx =>
                    {
                        ctx.Saga.WelcomeEmailSent = false;
                        ctx.Publish(new RevertSendWelcomeEmail(ctx.Message.Message.SubscriberId, ctx.Message.Message.Email));
                    }));
        }
    }
}        

Notice that, when the OnboardingFailed event is encountered we are transiting the state machine to the OnboardingFailed state and sending a message which will undo all the onboarding-related jobs. When it's done next FollowUpEmail will undo the previous step and finally WelcomeState will be reverted as well. That's the whole state machine that takes care of the end-to-end transition of the events. Now, let's set up our command handlers.

namespace Shared.Handlers
{
    public class SubscribeToNewsletterHandler : IConsumer<SubscribeToNewsletter>
    {
        private readonly IAppDbContext _appDbContext;
        private readonly ILogger<SubscribeToNewsletterHandler> _logger;

        public SubscribeToNewsletterHandler(IAppDbContext appDbContext, ILogger<SubscribeToNewsletterHandler> logger)
        {
            _appDbContext = appDbContext;
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SubscribeToNewsletter> context)
        {
            _logger.LogInformation("? SubscribeToNewsletterHandler");

            var subscriber = new Subscriber()
            {
                Id = Guid.NewGuid(),
                Email = context.Message.Email,
                SubscribedOnUtc = DateTime.UtcNow
            };

            await _appDbContext.SaveItem(subscriber, "OrderDb");

            await context.Publish(new SubscriberCreatedEvent()
            {
                SubscriberId = subscriber.Id,
                Email = context.Message.Email
            });
        }
    }
}        

SubscriberCreatedEvent ->SubscriberCreated State Transition -> Welcoming -> SendWelcomeEmail

namespace Shared.Handlers
{
    public class SendWelcomeEmailHandler : IConsumer<SendWelcomeEmail>
    {
        private readonly IAppDbContext _appDbContext;
        private readonly ILogger<SendWelcomeEmailHandler> _logger;

        public SendWelcomeEmailHandler(
            IAppDbContext appDbContext, 
            ILogger<SendWelcomeEmailHandler> logger
            )
        {
            _appDbContext = appDbContext;
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SendWelcomeEmail> context)
        {
            try
            {
                _logger.LogInformation("? SendWelcomeEmailHandler");

                await context.Publish(new WelcomeEmailSent()
                {
                    SubscriberId = context.Message.SubsciberId,
                    Email = context.Message.Email
                });
            }
            catch
            {
                await context.Publish<Fault<WelcomeEmailSent>>(new
                {
                    Message = new WelcomeEmailSent()
                    {
                        SubscriberId = context.Message.SubsciberId,
                        Email = context.Message.Email
                    }
                });
            }
        }
    }
}        

WelcomeEmailSent -> WelcomeEmailSent State Transition -> FollowingUp -> SendFollowUpEmail

namespace Shared.Handlers
{
    public class SendFollowUpEmailHandler : IConsumer<SendFollowUpEmail>
    {
        private readonly IAppDbContext _appDbContext;
        private readonly ILogger<SendFollowUpEmailHandler> _logger;

        public SendFollowUpEmailHandler(
            IAppDbContext appDbContext, 
            ILogger<SendFollowUpEmailHandler> logger
            )
        {
            _appDbContext = appDbContext;
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SendFollowUpEmail> context)
        {
            try
            {
                _logger.LogInformation("? SendFollowUpEmailHandler");
                
                await context.Publish(new FollowUpEmailSent()
                {
                    SubscriberId = context.Message.SubsciberId,
                    Email = context.Message.Email
                });
            }
            catch
            {
                await context.Publish<Fault<FollowUpEmailSent>>(new
                {
                    Message = new FollowUpEmailSent()
                    {
                        SubscriberId = context.Message.SubsciberId,
                        Email = context.Message.Email
                    }
                });
            }
        }
    }
}        

FollowUpEmailSent -> FollowUpEmailSent State Transition -> Onboarding -> FinalizeOnboarding -> Finalize

namespace Shared.Handlers
{
    public class FinalizeOnboardingHandler : IConsumer<FinalizeOnboarding>
    {
        private readonly IAppDbContext _appDbContext;
        private readonly ILogger<FinalizeOnboardingHandler> _logger;

        public FinalizeOnboardingHandler(
            IAppDbContext appDbContext, 
            ILogger<FinalizeOnboardingHandler> logger
            )
        {
            _appDbContext = appDbContext;
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<FinalizeOnboarding> context)
        {
            try
            {
                _logger.LogInformation("? OnboardingCompletedHandler");

                await context.Publish(new JobCompleted()
                {
                    SubscriberId = context.Message.SubsciberId,
                    Email = context.Message.Email
                });
            }
            catch
            {
                _logger.LogError("? Error encountered.........");
                _logger.LogError("? Reverting all changes");

                await context.Publish<Fault<OnboardingCompleted>>(new
                {
                    Message = new OnboardingCompleted()
                    {
                        SubscriberId = context.Message.SubsciberId,
                        Email = context.Message.Email
                    }
                });
            }
        }
    }
}        

Corresponding to the handlers we have revert/compensation handlers as well. Please go through the GitHub repository to learn more.

Now let's install the Nuget packages and set up the IoC container. We will prepare the WebAPI project first.

<PackageReference Include="MassTransit.MongoDb" Version="8.2.2" />        

This Nuget package will allow MongoDB to be used as the Saga repository.

Make sure you have the RabbitMQ and MongoDB server running as well. To set up the RabbitMQ server simply type this command in command line.

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3        

Also install the MongoDB community server installation guide can be found here

    builder.Services.AddMassTransit(config =>
    {
        config.UsingRabbitMq((r, c) =>
        {
            c.Host(builder.Configuration["RabbitMqServer"]);
            c.ConfigureEndpoints(r);
            c.UseInMemoryOutbox(r);
        });
        config.AddSagaStateMachine<OnboardingSaga, OnboardingSagaData>()
            .MongoDbRepository(r =>
            {
                r.Connection = "mongodb://127.0.0.1:27017";
                r.DatabaseName = "OrderDb";
                r.CollectionName = "SagaDatas";
            });

        busConfigurator?.Invoke(config);
    });        

Now we have to prepare the Worker services. If you have multiple worker services it will work just fine. For the sake of this article, I will consider a single-worker service. We have to register the handlers as rabbitMQ consumers.

Type[] handlers = new[] {
        typeof(SubscribeToNewsletterHandler),
        typeof(FinalizeOnboardingHandler),
        typeof(RevertOnboardingHandler),
        typeof(SendWelcomeEmailHandler),
        typeof(SendFollowUpEmailHandler),
        typeof(RevertSendFollowUpEmailHandler),
        typeof(RevertSendWelcomeEmailHandler),
    };

IHostBuilder builder = Host.CreateDefaultBuilder(_options.CommandLineArgs);
builder.ConfigureServices((settings, services) =>
{
    services.AddMassTransit((IBusRegistrationConfigurator config) =>
    {
        foreach (var foundHandler in handlers)
        {
            config.AddConsumer(foundHandler);
            config.AddRequestClient(foundHandler);
        }
    }
});             

That's about it. Now when you run both the applications your Saga state machine will initialize and required exchanges and queues will be created on the rabbitMQ server. All the saga states will be stored on the MongoDB database in this manner.

Now let's follow through an end-to-end process using the system that we created. First, a user sends a request to an API /StartOnboarding. Ideally, the flow should be this.

StartOnboarding ->   SubscribeToNewsletter ->   SendWelcomeEmail ->   SendFollowUpEmail -> OnboardingCompleted        

Let's look at the actual outcome of that.

Console log of the process
Database Saga data for the operation

We can notice that all the step flags are set to true, meaning there were no errors and the process ended as expected. Now, we imagine during OnboardingCompletedHandler some network/logical errors occurred and the code threw an exception. Let's follow along with that scenario. Also, let's place a breakpoint at the onboarding handler before simulating the exception. So, far no exception occurred and all things went as expected hence, all the flags were marked positive. Now it's time to simulate that exception.

Database Saga data for the operation
Simulating an error at the last handler
Compensation transactions in action

Now if you go to the database Saga data collection you will notice all the flags are updated according to state machine configuration. During this compensation, all the corresponding transactions took place which reverted the whole flow to the original state.

Updated saga data

All the flags are updated and marked as false. This is due to the compensation and rollback configuration defined within the state machine. That's it. We have successfully created an orchestration-based Saga state machine using MassTransit with RabbitMQ as a messaging queue and MongoDB as the database provider.

Feel free to share your thoughts and LIKE this article if it helps. If you made it this far you are awesome ;)

要查看或添加评论,请登录

Ittahad Uz Zaman Akash的更多文章

社区洞察

其他会员也浏览了