What is MassTransit ?
Muhammed Hanifi Dikmen
Software Developer | Mathematics, Computer Science | Fulbright Alumni
MasstTransit is a popular open-source nuget package for building message-based applications using .NET.? It is also abstraction over message brokers such as RabbitMQ, Azure Service Bus, SQS , and ActiveMQ.
It supports message-based communication, multiple messaging brokers that I mentioned in the definition, distributed transactions and saga, built-in retry, fault handling etc.
In this article, I will show how to consume the messages and also consume fault messages that are occurred by unhandled exceptions while processing the messages.
How to implement it ?
First of all, I will keep the app as simple as I can. That's why I create a class library named Common, and two console apps that are named Consumer and Producer.
Before starting, Docker Desktop needs to be installed on your local computer.
Install MassTransit.AspNetCore and MassTransit.RabbitMQ packages for Common layer
Install Microsoft.Extensions.Hosting for Consumer and Producer layers.
From Consumer and Producer layers add project reference to Common layer.
Let's assume that we are receiving message that consists of Title and Content properties over RabbitMQ Message Bus.
namespace Common.Events;
public class MessageSentEvent
{
public string Title { get; set; }
public string Content { get; set; }
}
using System.Reflection;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
namespace Common.Extentions;
public static class Extentions
{
public static void AddCustomMassTransit(this IServiceCollection serviceCollection, Assembly? assembly = null)
{
serviceCollection.AddMassTransit(config =>
{
config.SetKebabCaseEndpointNameFormatter();
// Configuration for RabbitMQ
if (assembly != null)
config.AddConsumers(assembly);
config.UsingRabbitMq((context, configurator) =>
{
configurator.Host("localhost", host =>
{
host.Username("guest");
host.Password("guest");
});
configurator.ReceiveEndpoint(configure =>
{
configure.DiscardFaultedMessages();
});
configurator.ConfigureEndpoints(context);
});
});
}
}
using Common.Events;
using MassTransit;
namespace Consumer.Consumers;
public partial class MessageSentEventConsumer : IConsumer<MessageSentEvent>
{
public async Task Consume(ConsumeContext<MessageSentEvent> context)
{
try
{
if (string.IsNullOrEmpty(context.Message.Content))
{
throw new ArgumentNullException(nameof(context.Message.Content), " content is required");
}
Console.WriteLine($"Processing message: {context.Message.Title}");
await Task.CompletedTask;
}
catch (Exception ex)
{
Console.WriteLine($"Error occurred while processing message: {ex.Message}");
throw;
}
}
}
In MessageCreatedEventConsumerFaultConsumer, I am just writing the message title that was belongs to the message that got failed while processing.
using Common.Events;
using MassTransit;
namespace Consumer;
public class MessageCreatedEventConsumerFaultConsumer : IConsumer<Fault<MessageSentEvent>>
{
public Task Consume(ConsumeContext<Fault<MessageSentEvent>> context)
{
// Handle failed messages
Console.WriteLine($"Message failed: {context.Message.Message.Title}");
return Task.CompletedTask;
}
}
Configuring the MassTransit In Consumer console.
using Common.Extentions;
using Microsoft.Extensions.Hosting;
using System.Reflection;
namespace Consumer;
public class Program
{
protected Program()
{
}
public static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddCustomMassTransit(Assembly.GetExecutingAssembly());
})
.Build();
await host.RunAsync();
}
}
using Common.Events;
using MassTransit;
namespace Producer.Services;
public class MessageService
{
private readonly IPublishEndpoint _publishEndpoint;
public MessageService(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task CreateMessageAsync(string title, string content)
{
var messageSentEvent = new MessageSentEvent
{
Title = title,
Content = content
};
await _publishEndpoint.Publish(messageSentEvent);
Console.WriteLine($"Message with {title} was published.");
}
}
Configure the message service in Program.cs. When I publish the message, I intentionally send the content as string.Empty to get ArgumentNullException while consuming it.
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Producer.Services;
namespace Producer;
public class Program
{
protected Program()
{
}
public static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddScoped<MessageService>();
services.AddMassTransit(config =>
{
config.SetKebabCaseEndpointNameFormatter();
config.UsingRabbitMq((context, configurator) =>
{
configurator.Host("localhost", host =>
{
host.Username("guest");
host.Password("guest");
});
configurator.ConfigureEndpoints(context);
});
});
})
.Build();
var messageService = host.Services.GetRequiredService<MessageService>();
string title = "MassTransit";
string message = "MassTransit is free software/open-source .NET-based Enterprise Service Bus software that helps .NET developers route messages over RabbitMQ, Azure Service Bus, SQS, and ActiveMQ service busses. It supports multicast, versioning, encryption, sagas, retries, transactions, distributed systems and other features.";
await messageService.CreateMessageAsync(title, string.Empty);
await host.RunAsync();
}
}
Let's test our app
docker run -d --name messagebroker -p 15672:15672 -p 5672:5672 -p 5673:5673 rabbitmq:3-management
领英推荐
What is fanout exchange in RabbitMQ ?
A fanout exchange is a type of exchange that routes messages to all of its bound queues without any filtering.
If you continue stepping over, the ArgumentNullException will be throwed, and fault event gets triggered.
After the exception is thrown, you will see one more queue is created and that is called "message-sent-event error" in the rabbitmq management console.
Click the message-sent-event_error, and then click the Get Messages.
You will see that the error we got with exception stack trace sent to the RabbitMQ for further processing as you wish or you can discard these messages to not be passed to the _error queue.
You can discard by adding MessageSentEventConsumerDefinition under EventDefinitions folder in Consumer console app.
Basically, Using configurator.ConfigureEndpoints(context); configures endpoints automatically, is not discarding fault messages, and creating "_error" queues. To avoid that situation, defining a class that inherits from ConsumerDefinition and by overriding the ConfigureConsumer helps to make custom configurations for each events.
public class MessageSentEventConsumerDefinition :
ConsumerDefinition<MessageSentEventConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<MessageSentEventConsumer> consumerConfigurator, IRegistrationContext context)
{
endpointConfigurator.DiscardFaultedMessages();
base.ConfigureConsumer(endpointConfigurator, consumerConfigurator, context);
}
}
Thanks for your reading.
Here is the github repository link below : https://github.com/mhdikmen/MassTransitExample1