Message Queuing Microservices
1. Message Queuing
1.1 How it Works
1.2 Benefits in Microservices
1.3 Common Use Cases
1.4 Popular Message Queue Technologies
1.5 Considerations
1.6 Comparison
Overall, message queuing plays a crucial role in enabling asynchronous, scalable, and reliable communication between microservices, making it an essential tool for building resilient and adaptable distributed systems.
2. Rabbit MQ
2.1 Specific details
2.2 RabbitMQs architecture
There are four types of exchanges:
o?? Direct:?Routes messages based on an exact routing key match with the binding key.
o?? Fanout:?Broadcasts messages to all connected queues.
o?? Topic:?Routes messages based on pattern matching with the routing key. E.g. Above diagram “ship.shoes”
o?? Headers:?Routes messages based on message headers. The routing key is ignored completely.
o?? Default: Routes message based on Routing key tie with queue name. E.g. Above diagram “inv” routing key.
2.3 Additional Components
2.4 Benefits of this Architecture
2.5 Code snippets in C#
Establishing a Connection
using RabbitMQ.Client;
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
Declaring a Queue
channel.QueueDeclare(queue: "my_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
Publishing a Message:
var message = Encoding.UTF8.GetBytes("Hello, world!");
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: message);
Console.WriteLine("Message published");
Consuming a Message:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received message: {0}", message);
};
channel.BasicConsume(queue: "my_queue", autoAck: true, consumer: consumer);
Console.WriteLine("Waiting for messages...");
Closing the Connection:
// ... (asynchronous operations)
channel.Close();
connection.Close();
2.6 Additional Snippets
Declaring an Exchange:
channel.ExchangeDeclare(exchange: "my_exchange", type: ExchangeType.Direct);
Binding a Queue to an Exchange:
channel.QueueBind(queue: "my_queue", exchange: "my_exchange", routingKey: "my_routing_key")
Setting Message Persistence:
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: message, mandatory: true, immediate: false);
Handling Acknowledgements:
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Using Publisher Confirms:
channel.ConfirmSelect();
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: message);
channel.WaitForConfirmsOrDie();
Implementing Error Handling:
try
{
// RabbitMQ operations
}
catch (Exception ex)
{
// Log the error
}
3. Apache Kafka
3.1 Technical Aspects
3.2 Apache Kafka's architecture elements
1. Topics:
2. Producers:
3. Brokers:
4. Consumers:
5. Partitions:
6. Offsets:
7.KRaft
3.3 Additional Components:
3.4 Key Advantages:
3.5 Apache Kafka Security:
Apache Kafka offers various security features and mechanisms to protect your data and ensure your messaging platform remains secure.
Here's a breakdown of key aspects:
3.5.1. Authentication:
3.5.2. Authorization:
3.5.3. Data Encryption:
领英推荐
3.5.4. Audit Logging:
3.5.5. Secure Operations:
3.6 Apache Kafka vs Confluent Kafka:
3.7 Code Snippet in C#
Producing Messages:
using Confluent.Kafka;
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using
var producer = new ProducerBuilder<Null, string>(config).Build();
var message = new Message<Null, string> { Value = "Hello, Kafka!" };
producer.ProduceAsync("my-topic", message).GetAwaiter().GetResult();
Console.WriteLine("Message produced");
Consuming Messages:
using Confluent.Kafka;
var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "my-group" };
using var consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe("my-topic");
while (true)
{
var consumeResult = consumer.Consume();
var message = consumeResult.Message.Value;
Console.WriteLine("Consumed message: {0}", message);
}
3.8 Additional Snippets
Creating a Topic:
using Confluent.Kafka;
var adminClient = new AdminClient(new AdminClientConfig { BootstrapServers = "localhost:9092" });
var newTopic = new NewTopic("my-new-topic", 1, 1); // 1 partition, 1 replica
adminClient.CreateTopics(new List<TopicSpecification> { newTopic }).GetAwaiter().GetResult();
Setting Message Key:
var message = new Message<Null, string> { Key = "my-key", Value = "My message" };
Handling Asynchronous Operations:
producer.ProduceAsync("my-topic", message).ContinueWith(task =>
{
if (task.IsFaulted)
{
Console.WriteLine("Error producing message: {0}", task.Exception.Message);
}
});
Using Consumer Offsets:
// Commit offsets manually
consumer.Commit(consumeResult);
// Subscribe to a specific partition
consumer.Assign(new List<TopicPartitionOffset>() { new TopicPartitionOffset("my-topic", 0, 0) });
Error Handling:
try
{
// Kafka operations
}
catch (KafkaException ex)
{
// log the error
}
4. Amazon SQS
4.1 Specific Details:
4.2 Amazon SQS Architecture:
4.3 Optional Components:
4.4 Benefits of SQS Architecture:
4.5 Visibility timeout
4.6 Code Snippet with SQS and SNS in C#
Amazon SQS (Simple Queue Service):
1. Sending a Message:
using Amazon.SQS;
using Amazon.SQS.Model;
var sqsClient = new AmazonSQSClient();
var queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue";
var messageRequest = new SendMessageRequest
{
QueueUrl = queueUrl,
MessageBody = "Hello, SQS!"
};
var sendMessageResponse = await sqsClient.SendMessageAsync(messageRequest);
Console.WriteLine("Message sent with ID: {0}", sendMessageResponse.MessageId);
2. Receiving a Message:
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = queueUrl,
MaxNumberOfMessages = 1, // Receive up to 10 messages at a time
VisibilityTimeout = 30 // Messages are hidden for 30 seconds after retrieval
};
var receiveMessageResponse = await sqsClient.ReceiveMessageAsync(receiveMessageRequest);
if (receiveMessageResponse.Messages.Count > 0)
{
var message = receiveMessageResponse.Messages[0];
Console.WriteLine("Received message: {0}", message.Body);
// Process the message here
// Delete the message from the queue
var deleteMessageRequest = new DeleteMessageRequest
{
QueueUrl = queueUrl,
ReceiptHandle = message.ReceiptHandle
};
await sqsClient.DeleteMessageAsync(deleteMessageRequest);
}
Amazon SNS (Simple Notification Service):
1. Publishing a Message to a Topic:
using Amazon.SNS;
using Amazon.SNS.Model;
var snsClient = new AmazonSNSClient();
var topicArn = "arn:aws:sns:us-east-1:123456789012:my-topic";
var publishRequest = new PublishRequest
{
TopicArn = topicArn,
Message = "Hello, SNS subscribers!"
};
var publishResponse = await snsClient.PublishAsync(publishRequest);
Console.WriteLine("Message published with ID: {0}", publishResponse.MessageId);
2. Subscribing an SQS Queue to an SNS Topic:
var subscribeRequest = new SubscribeRequest
{
TopicArn = topicArn,
Protocol = "sqs",
Endpoint = queueUrl // Provide your SQS queue URL here
};
var subscribeResponse = await snsClient.SubscribeAsync(subscribeRequest);
Console.WriteLine("Subscription ARN: {0}", subscribeResponse.SubscriptionArn);
Remember:
5. Azure Service Bus
5.1 Technical Aspects:
5.2 Azure Service Bus Architecture
1. Topics and Subscriptions:
2. Queues: Message buffers that store messages in order until consumed. Azure Service Bus offers three queue types:
?
3. Producers and Consumers:
4. Azure Service Bus Service:
5.3 Optional Components:
5.4 Benefits of Azure Service Bus Architecture:
5.5 Code snippet in C#
using Azure.Messaging.ServiceBus;
string connectionString = "ServiceBusConnectionString";
string queueName = "myqueue";
ServiceBusClient client = new ServiceBusClient(connectionString);
ServiceBusSender sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage("Hello, world!");
await sender.SendMessageAsync(message);
Console.WriteLine("Message sent");
2. Receiving a Message from a Queue:
ServiceBusProcessor processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += async (message, context) =>
{
Console.WriteLine("Received message: {0}", message.Body);
await context.CompleteMessageAsync(message);
};
await processor.StartProcessingAsync();
Console.WriteLine("Waiting for messages...");
3. Sending a Message to a Topic:
string topicName = "mytopic";
ServiceBusSender sender = client.CreateSender(topicName);
ServiceBusMessage message = new ServiceBusMessage("Hello, subscribers!");
await sender.SendMessageAsync(message);
4. Receiving a Message from a Subscription:
string subscriptionName = "mysubscription";
ServiceBusProcessor processor = client.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
5.6 Additional Snippets:
Creating a Queue or Topic:
await client.CreateQueueAsync(queueName);
await client.CreateTopicAsync(topicName);
Deleting a Queue or Topic:
await client.DeleteQueueAsync(queueName);
await client.DeleteTopicAsync(topicName);
Setting Message Headers:
message.ApplicationProperties["Priority"] = "High";
Using Scheduled Messages:
message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
Handling Exceptions:
try
{
// ServiceBus operations
}
catch (ServiceBusException ex)
{
// Log the error message
}