What is MassTransit ?

What is MassTransit ?

MasstTransit is a popular open-source nuget package for building message-based applications using .NET.? It is also abstraction over message brokers such as RabbitMQ, Azure Service Bus, SQS , and ActiveMQ.

It supports message-based communication, multiple messaging brokers that I mentioned in the definition, distributed transactions and saga, built-in retry, fault handling etc.

In this article, I will show how to consume the messages and also consume fault messages that are occurred by unhandled exceptions while processing the messages.


How to implement it ?

First of all, I will keep the app as simple as I can. That's why I create a class library named Common, and two console apps that are named Consumer and Producer.

Before starting, Docker Desktop needs to be installed on your local computer.

Install MassTransit.AspNetCore and MassTransit.RabbitMQ packages for Common layer

Install Microsoft.Extensions.Hosting for Consumer and Producer layers.

From Consumer and Producer layers add project reference to Common layer.

Let's assume that we are receiving message that consists of Title and Content properties over RabbitMQ Message Bus.

  • In Common layer create a new folder called Events and a class named MessageSentEvent that will present a message contract under Events folder.

namespace Common.Events;

public class MessageSentEvent
{
    public string Title { get; set; }
    public string Content { get; set; }
}        

  • In Common layer create a new folder called Extentions and a class named Extentions under Extentions folder.

using System.Reflection;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

namespace Common.Extentions;

public static class Extentions
{
	public static void AddCustomMassTransit(this IServiceCollection serviceCollection, Assembly? assembly = null)
	{
		serviceCollection.AddMassTransit(config =>
		{
			config.SetKebabCaseEndpointNameFormatter();

			// Configuration for RabbitMQ
			if (assembly != null)
				config.AddConsumers(assembly);

			config.UsingRabbitMq((context, configurator) =>
			{
				configurator.Host("localhost", host =>
				{
					host.Username("guest");
					host.Password("guest");
				});

				configurator.ReceiveEndpoint(configure =>
				{
					configure.DiscardFaultedMessages();
				});

				configurator.ConfigureEndpoints(context);
			});
		});
	}
}
        

  • In Consumer layer, create consumers called MessageSentEventConsumer, and MessageSentEventConsumerFaultConsumer under Consumers folder .
  • In MessageSentEventConsumer, When reciving the message from the message bus, I will intentionally throw an exception to make the message handled by MessageSentEventConsumerFaultConsumer.

using Common.Events;
using MassTransit;

namespace Consumer.Consumers;

public partial class MessageSentEventConsumer : IConsumer<MessageSentEvent>
{
    public async Task Consume(ConsumeContext<MessageSentEvent> context)
    {
        try
        {
            if (string.IsNullOrEmpty(context.Message.Content))
            {
                throw new ArgumentNullException(nameof(context.Message.Content), " content is required");
            }

            Console.WriteLine($"Processing message: {context.Message.Title}");
            await Task.CompletedTask;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error occurred while processing message: {ex.Message}");
            throw;
        }
    }
}        

In MessageCreatedEventConsumerFaultConsumer, I am just writing the message title that was belongs to the message that got failed while processing.

using Common.Events;
using MassTransit;

namespace Consumer;

public class MessageCreatedEventConsumerFaultConsumer : IConsumer<Fault<MessageSentEvent>>
{
    public Task Consume(ConsumeContext<Fault<MessageSentEvent>> context)
    {
        // Handle failed messages
        Console.WriteLine($"Message failed: {context.Message.Message.Title}");
        return Task.CompletedTask;
    }
}        

Configuring the MassTransit In Consumer console.

using Common.Extentions;
using Microsoft.Extensions.Hosting;
using System.Reflection;

namespace Consumer;

public class Program
{
    protected Program()
    {
    }
    public static async Task Main(string[] args)
    {
        var host = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddCustomMassTransit(Assembly.GetExecutingAssembly());
            })
            .Build();

        await host.RunAsync();
    }
}        

  • After the consumer stuff is done. We need a producer to produce messages.
  • In Producer console app, create a MessageService under Services folder to publish messages.

using Common.Events;
using MassTransit;

namespace Producer.Services;

public class MessageService
{
    private readonly IPublishEndpoint _publishEndpoint;

    public MessageService(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    public async Task CreateMessageAsync(string title, string content)
    {
        var messageSentEvent = new MessageSentEvent
        {
            Title = title,
            Content = content
        };
        await _publishEndpoint.Publish(messageSentEvent);
        Console.WriteLine($"Message with {title} was published.");
    }
}        

Configure the message service in Program.cs. When I publish the message, I intentionally send the content as string.Empty to get ArgumentNullException while consuming it.

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Producer.Services;

namespace Producer;

public class Program
{
    protected Program()
    {
    }
    public static async Task Main(string[] args)
    {
        var host = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddScoped<MessageService>();
                services.AddMassTransit(config =>
                {
                    config.SetKebabCaseEndpointNameFormatter();

                    config.UsingRabbitMq((context, configurator) =>
                    {
                        configurator.Host("localhost", host =>
                        {
                            host.Username("guest");
                            host.Password("guest");
                        });
                        configurator.ConfigureEndpoints(context);
                    });
                });
            })
            .Build();
        var messageService = host.Services.GetRequiredService<MessageService>();

        string title = "MassTransit";
        string message = "MassTransit is free software/open-source .NET-based Enterprise Service Bus software that helps .NET developers route messages over RabbitMQ, Azure Service Bus, SQS, and ActiveMQ service busses. It supports multicast, versioning, encryption, sagas, retries, transactions, distributed systems and other features.";

        await messageService.CreateMessageAsync(title, string.Empty);

        await host.RunAsync();
    }
}        

Let's test our app

  • First, we need a up and running RabbitMQ container. To have it, run this commad ;

docker run -d --name messagebroker -p 15672:15672 -p 5672:5672 -p 5673:5673 rabbitmq:3-management         

  • Run Consumer console app to consume messages from RabbitMQ.
  • Open the RabbitMQ management console. You will see that exchanges are created by using fanout type.


What is fanout exchange in RabbitMQ ?

A fanout exchange is a type of exchange that routes messages to all of its bound queues without any filtering.

  • Then, Put break point in at the beginning of the Consume method in MessageSentEventConsumer class and the Consume method in MessageSentEventConsumerFaultConsumer..
  • Create a new debugging instance on Producer console app. You will see the message that will be "Message with MassTransit was published." in console.
  • After getting the message, the breakpoint will be hit. While stepping over the line by line, you will see that, you will get a message with empty content then hit the if-statement. After that, ArgumentNullException will be throwed. Before throwing the error. I would like to show screenshot of rabbitmq management console below ;


Before throwing the error


Message before throwing the error

If you continue stepping over, the ArgumentNullException will be throwed, and fault event gets triggered.


After throwing the error, fault handler events gets triggered


After the exception is thrown, you will see one more queue is created and that is called "message-sent-event error" in the rabbitmq management console.


message-sent-event_error got created by MassTransit

Click the message-sent-event_error, and then click the Get Messages.


message-sent-event_error queue message

You will see that the error we got with exception stack trace sent to the RabbitMQ for further processing as you wish or you can discard these messages to not be passed to the _error queue.

You can discard by adding MessageSentEventConsumerDefinition under EventDefinitions folder in Consumer console app.

Basically, Using configurator.ConfigureEndpoints(context); configures endpoints automatically, is not discarding fault messages, and creating "_error" queues. To avoid that situation, defining a class that inherits from ConsumerDefinition and by overriding the ConfigureConsumer helps to make custom configurations for each events.

public class MessageSentEventConsumerDefinition :
	ConsumerDefinition<MessageSentEventConsumer>
{
	protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<MessageSentEventConsumer> consumerConfigurator, IRegistrationContext context)
	{
        endpointConfigurator.DiscardFaultedMessages();
		base.ConfigureConsumer(endpointConfigurator, consumerConfigurator, context);
	}
}        





Thanks for your reading.

Here is the github repository link below : https://github.com/mhdikmen/MassTransitExample1

要查看或添加评论,请登录

社区洞察

其他会员也浏览了