Transactional outbox pattern

Transactional outbox pattern


Often times, we find ourselves developing a feature that needs to publish an event. On a few occasions, message delivery must be part of saving the state, leading to distributed transactions. This mean that a message must be published if save was completed. There are many implementations, from incomplete to highly complex. I did an implement on one of my personal projects following the Transactional outbox pattern.

How to save and publish an event?

These problems are decomposed in discreet functions.

  1. Save the state along with an event using the database transaction capabilities.
  2. Pool events from the database.
  3. Keep track of published events to handle duplicates.
  4. Publish events.
  5. Update event status or delete event from database. These functions are implemented as asynchronous task of the Hosted service for sending and receiving messages accordingly.


Save state along with event:

Below the methos implemented as part of a repository class. The save method add the event into the DbSet<OutgoingEventEntity>. The event entity contains the information of message broker and keep a counter of failing. Later in this article, see the hosted service logic responsible for reading and keeping the event metadata updated as the service try to deliver the message.

Details about the repository pattern are not being documented as part of this article.

public class OutgoingEventEntity : Entity
{
    public string MsgQueueEndpoint { get; set; } = string.Empty;
    public string MsgQueueName { get; set; } = string.Empty;
    public string Type { get; set; } = string.Empty;
    public string Version { get; set; } = string.Empty;
    public string MessageKey { get; set; } = string.Empty;
    public string Body { get; set; } = string.Empty;
    public long CreationTime { get; set; }
    public bool IsDeleted { get; set; }
    public bool IsSent { get; set; }
    public string SendFailReason { get; set; } = string.Empty;
    public int RetryCount { get; set; } = 0;
    public string ServiceInstanceId { get; set; } = string.Empty;
    public bool WasAcknowledge { get; set; }
}

public DbSet<OutgoingEventEntity> OutgoingEvents { get; set; }

public async Task<bool> SaveWithEvent(OutgoingEventEntity eventEntity, CancellationToken token)
{
    _dbContext.Add(eventEntity);
    var strategy = _dbContext.Database.CreateExecutionStrategy();

    if (token.IsCancellationRequested)
        return false;

    await strategy.ExecuteInTransactionAsync(
        operation: async () =>
            {
                await _dbContext.SaveChangesAsync(acceptAllChangesOnSuccess: false).ConfigureAwait(false);
            },
        verifySucceeded: async () =>
            {
                return await _dbContext.Set<OutgoingEventEntity>().AsNoTracking().AnyAsync(ee => ee.Id == eventEntity.Id).ConfigureAwait(false);
            }).ConfigureAwait(false);

    _dbContext.ChangeTracker.AcceptAllChanges();
    if (OnSave != null)
    {
        OnSave(this, new ExternalMessageEventArgs()
        {
            ExternalMessage = eventEntity!.ConvertToExternalMessage()
        });
    }
    return true;
}        

Hosted Service for sending message:

Message sender class leverage the de duplicate capability of the message broker.

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Sample.Sdk.Core.Extensions;
using Sample.Sdk.Data.Msg;
using Sample.Sdk.Interface.Caching;
using Sample.Sdk.Interface.Msg;
using static Sample.Sdk.Core.Extensions.AggregateExceptionExtensions;

namespace Sample.Sdk.Core.Msg
{
    public class MessageSenderService : IMessageRealtimeService, ISendExternalMessage
    {
        private readonly ILogger<MessageSenderService> _logger;

        private static Lazy<IInMemoryCollection<ExternalMessage>> eventListToSend = new Lazy<IInMemoryCollection<ExternalMessage>>(
            () =>
            {
                return new InMemoryCollection<ExternalMessage>();
            }, true);

        private static Lazy<IInMemoryCollection<string>> eventListSent = new Lazy<IInMemoryCollection<string>>(
            () =>
            {
                return new InMemoryCollection<string>();
            }, true);

        private static Lazy<IInMemoryCollection<MessageFailed>> failedMessage = new Lazy<IInMemoryCollection<MessageFailed>>(
            () =>
            {
                return new InMemoryCollection<MessageFailed>();
            }, true);

        private readonly IInMemoryCollection<ExternalMessage> _eventListToSend = eventListToSend.Value;
        private readonly IInMemoryCollection<string> _eventListSent = eventListSent.Value;
        private readonly IInMemoryCollection<MessageFailed> _failedEventList = failedMessage.Value;
        private readonly IMessageSender _messageSender;
        private readonly IOutgoingMessageProvider _outgoingMessageProvider;

        public MessageSenderService(ILogger<MessageSenderService> logger,
            IMessageSender messageSender,
            IOutgoingMessageProvider outgoingMessageProvider)
        {
            _logger = logger;
            _messageSender = messageSender;
            _outgoingMessageProvider = outgoingMessageProvider;
        }

        public void SendMessage(ExternalMessage externalMessage)
        {
            _eventListToSend.Add(externalMessage);
        }

        /// <summary>
        /// Compute the tasks out of order
        /// </summary>
        /// <param name="cancellationToken">Cancel operation</param>
        /// <returns></returns>
        public async Task Compute(CancellationToken cancellationToken)
        {
            var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = tokenSource.Token;
            var tasks = new List<Task>();
            tasks.AddTaskWithConfigureAwaitFalse(
                        SendMessage(token),
                        ReadEventFromDurableStorage(token),
                        SaveFailedMessage(token),
                        UpdateOutgoingEventEntity(token));
            try
            {
                await Task.WhenAll(tasks).ConfigureAwait(false);
            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
            }
            try
            {
                tokenSource.Cancel();
                tokenSource.Dispose();
            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
            }

        }

        /// <summary>
        /// Send message from event list to send using message sender service.
        /// </summary>
        /// <param name="token">Cancel operation</param>
        /// <returns></returns>
        private async Task SendMessage(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                if (_eventListToSend.TryTakeAll(out var eventListToSend))
                {
                    eventListToSend.RemoveAll(e => e == null);
                    await Parallel.ForEachAsync(eventListToSend, async (eventEntity, token) =>
                    {
                        await _messageSender.SendMessage(token, eventEntity!,
                                        msg =>
                                        {
                                            //success send
                                            if (msg != null)
                                            {
                                                _eventListSent.TryAdd(msg.Id);
                                            }
                                        }, (msg, reason, exception) =>
                                        {
                                            //error on send
                                            if (msg != null && exception == null)
                                            {
                                                _failedEventList.Add(new MessageFailed()
                                                {
                                                    MessageId = msg.Id,
                                                    SendFailedReason = reason?.ToString() ?? string.Empty
                                                });
                                            }
                                            if (msg != null && exception is ServiceBusException)
                                            {
                                                _eventListToSend.TryAdd(msg);
                                            }
                                        }).ConfigureAwait(false);

                    }).ConfigureAwait(false);
                }

                await Task.Delay(1000, token).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// Read from durable storage events saved to be send that has not being sent or failed before.
        /// </summary>
        /// <param name="token">To cancel operation</param>
        /// <returns></returns>
        private async Task ReadEventFromDurableStorage(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                try
                {
                    var msgs = await _outgoingMessageProvider.GetMessages(token,
                        (eventEntity) => !eventEntity.IsDeleted && !eventEntity.IsSent && eventEntity.RetryCount == 0)
                        .ConfigureAwait(false);
                    msgs.ToList().ForEach(e =>
                    {
                        _eventListToSend.TryAdd(e);
                    });
                }
                catch (Exception e)
                {
                    e.LogException(_logger.LogCritical);
                }
                await Task.Delay(TimeSpan.FromMinutes(5), token).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// Read failed list of event to save to durable storage
        /// </summary>
        /// <param name="token">Cancel the operation</param>
        /// <returns></returns>
        private async Task SaveFailedMessage(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                if (_failedEventList.TryTakeAll(out var msgs))
                {
                    if (msgs.Any())
                    {
                        var messageFaileds = new List<string>();
                        msgs.ForEach(msg => messageFaileds.Add(msg.MessageId));
                        await _outgoingMessageProvider.UpdateSentMessages(messageFaileds, token,
                            entity =>
                            {
                                var msgFailed = msgs.FirstOrDefault(msg => msg.MessageId == entity.Id);
                                if (msgFailed != null)
                                {
                                    entity.SendFailReason = msgFailed.SendFailedReason;
                                }
                                entity.RetryCount = entity.RetryCount++;
                                return entity;
                            },
                            (messageId, exception) =>
                            {
                                //on error
                                if (messageId != null &&
                                        (exception is Microsoft.EntityFrameworkCore.DbUpdateException ||
                                        exception is OperationCanceledException))
                                {
                                    var msgFailed = msgs.FirstOrDefault(msg => msg.MessageId == messageId);
                                    if (msgFailed != null)
                                    {
                                        _failedEventList.Add(msgFailed);
                                    }
                                }
                            })
                            .ConfigureAwait(false);
                    }
                }

                await Task.Delay(TimeSpan.FromMinutes(5), token).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// Update durable storage with message identifier sent
        /// </summary>
        /// <param name="token">To cancel operation</param>
        /// <returns></returns>
        private async Task UpdateOutgoingEventEntity(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                if (_eventListSent.TryTakeAll(out var listToUpdate))
                {
                    await _outgoingMessageProvider.UpdateSentMessages(listToUpdate!, token,
                    (eventEntity) =>
                    {
                        eventEntity.IsSent = true;
                        return eventEntity;
                    },
                    (failMsgId, exception) =>
                    {
                        if (failMsgId != null
                            && (exception is Microsoft.EntityFrameworkCore.DbUpdateException
                                || exception is OperationCanceledException))
                        {
                            _eventListSent.TryAdd(failMsgId);
                        }
                    })
                    .ConfigureAwait(false);
                }

                await Task.Delay(TimeSpan.FromMilliseconds(100), token).ConfigureAwait(false);
            }
        }
    }
}
        

Hosted Service for receiving message:

In coming messages are being computed within the message broker transaction. Messages that are bound to long running operations out of order, are stored in an in memory thread safe with de duplicate capability.

Hosted Service source code:

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Sample.Sdk.Core.Caching;
using Sample.Sdk.Core.Extensions;
using Sample.Sdk.Data;
using Sample.Sdk.Data.Entities;
using Sample.Sdk.Data.Msg;
using Sample.Sdk.Data.Options;
using Sample.Sdk.Interface.Caching;
using Sample.Sdk.Interface.Msg;
using Sample.Sdk.Interface.Security;
using Sample.Sdk.Interface.Security.Asymetric;
using System.Linq.Expressions;
using System.Text.Json;
using static Sample.Sdk.Data.Enums.Enums;

namespace Sample.Sdk.Core.Msg
{
    public class MessageReceiverService : IMessageRealtimeService
    {
        private class ComputedMessage : Message
        {
            public InComingEventEntity EventEntity { get; init; }
            public Expression<Func<InComingEventEntity, bool>> PropertyToUpdate { get; init; } = default;
            public string Id { get => EventEntity.Id; set => throw new NotImplementedException(); }
        }

        private static Lazy<IInMemoryDeDuplicateCache<InComingEventEntity>> inComingEventEntity = new(
            () =>
            {
                return new InMemoryDeDuplicateCache<InComingEventEntity>(
                    MemoryCacheState<string, string>.Instance(),
                    NullLoggerFactory.Instance.CreateLogger<InMemoryDeDuplicateCache<InComingEventEntity>>());
            }, true);
        private static Lazy<IInMemoryDeDuplicateCache<InCompatibleMessage>> inCompatibleMessage = new(
            () =>
            {
                return new InMemoryDeDuplicateCache<InCompatibleMessage>(
                    MemoryCacheState<string, string>.Instance(),
                    NullLoggerFactory.Instance.CreateLogger<InMemoryDeDuplicateCache<InCompatibleMessage>>());
            }, true);
        private static Lazy<IInMemoryDeDuplicateCache<CorruptedMessage>> corruptedMessages = new(
            () =>
            {
                return new InMemoryDeDuplicateCache<CorruptedMessage>(
                    MemoryCacheState<string, string>.Instance(),
                    NullLoggerFactory.Instance.CreateLogger<InMemoryDeDuplicateCache<CorruptedMessage>>());
            }, true);
        private static Lazy<IInMemoryDeDuplicateCache<InComingEventEntity>> ackMessages = new(
            () =>
            {
                return new InMemoryDeDuplicateCache<InComingEventEntity>(
                    MemoryCacheState<string, string>.Instance(),
                    NullLoggerFactory.Instance.CreateLogger<InMemoryDeDuplicateCache<InComingEventEntity>>());
            }, true);
        private static Lazy<IInMemoryDeDuplicateCache<ComputedMessage>> persistMessages = new(
            () =>
            {
                return new InMemoryDeDuplicateCache<ComputedMessage>(
                    MemoryCacheState<string, string>.Instance(),
                    NullLoggerFactory.Instance.CreateLogger<InMemoryDeDuplicateCache<ComputedMessage>>());
            }, true);

        private readonly IMessageComputation _computations;
        private readonly IComputeExternalMessage _computeExternalMessage;
        private readonly IInMemoryDeDuplicateCache<InComingEventEntity> _inComingEvents = inComingEventEntity.Value;
        private readonly IInMemoryDeDuplicateCache<InCompatibleMessage> _incompatibleMessages = inCompatibleMessage.Value;
        private readonly IInMemoryDeDuplicateCache<CorruptedMessage> _corruptedMessages = corruptedMessages.Value;
        private readonly IInMemoryDeDuplicateCache<InComingEventEntity> _ackMessages = ackMessages.Value;
        private readonly IInMemoryDeDuplicateCache<ComputedMessage> _persistMessages = persistMessages.Value;

        private readonly IAsymetricCryptoProvider _asymetricCryptoProvider;
        private readonly IServiceProvider _serviceProvider;
        private readonly IOptions<List<AzureMessageSettingsOptions>> _messagingOptions;
        private readonly IMessageSender _messageSender;
        private readonly IMessageReceiver _messageBusReceiver;
        private readonly ILogger<MessageReceiverService> _logger;
        private readonly IMessageCryptoService _cryptoService;
        private CancellationTokenSource? _cancellationTokenSource;
        public MessageReceiverService(
            IMessageComputation computations,
            IComputeExternalMessage computeExternalMessage,
            IMessageReceiver messageBusReceiver,
            ILogger<MessageReceiverService> logger,
            IMessageCryptoService cryptoService,
            IAsymetricCryptoProvider asymetricCryptoProvider,
            IServiceProvider serviceProvider,
            IOptions<List<AzureMessageSettingsOptions>> messagingOptions,
            IMessageSender messageSender)
        {
            _computations = computations;
            _computeExternalMessage = computeExternalMessage;
            _messageBusReceiver = messageBusReceiver;
            _logger = logger;
            _cryptoService = cryptoService;
            _asymetricCryptoProvider = asymetricCryptoProvider;
            _serviceProvider = serviceProvider;
            _messagingOptions = messagingOptions;
            _messageSender = messageSender;
        }

        public async Task Compute(CancellationToken cancellationToken)
        {
            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = _cancellationTokenSource.Token;
            var tasks = new List<Task>() 
            {
                ReceiveMessages(token),
                RetrieveInComingEventEntityFromDatabase(token),
                ComputeReceivedMessage(token),
                UpdateEventStatus(token),
                RetrieveAcknowledgementMessage(token),
                SendAckMessages(token)
            };
            tasks.ForEach(t => t.ConfigureAwait(false));
            try
            {
                await Task.WhenAll(tasks).ConfigureAwait(false);
            }
            catch (OperationCanceledException oce)
            {
                oce.LogException(_logger.LogCritical);
            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
                _cancellationTokenSource?.Cancel();
            }
            finally
            {
                _cancellationTokenSource?.Dispose();
            }
        }

        private async Task SendAckMessages(CancellationToken cancellationToken)
        {
            var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = tokenSource.Token;
            try
            {
                while (!token.IsCancellationRequested)
                {
                    while (_ackMessages.TryTakeAllWithoutDuplicate(out var messages, token))
                    {
                        var msgToSend = messages.ToList().ConvertAll(msg => msg.ConvertToExternalMessage());

                        await _messageSender.SendMessages((msg) => msg.AckQueueName, msgToSend, onSent: msgs =>
                        {
                            msgs.ToList().ForEach(msg =>
                            {
                                var computedMessage = new ComputedMessage()
                                {
                                    EventEntity = msg.ConvertToInComingEventEntity(),
                                    PropertyToUpdate = (message) => message.WasAcknowledge
                                };
                                computedMessage.EventEntity.WasAcknowledge = true;
                                _persistMessages.TryAdd(computedMessage);
                            });

                        }, 
                        onError : (msgs, exception) =>
                            {

                            }, token).ConfigureAwait(false);

                        await Task.Delay(1000, token).ConfigureAwait(false);
                    }
                    await Task.Delay(1000, token).ConfigureAwait(false);
                }

            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
            }
            finally
            {
                tokenSource.Dispose();
            }
        }

        private async Task RetrieveAcknowledgementMessage(CancellationToken cancellationToken)
        {
            var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = tokenSource.Token;
            try
            {
                while (!token.IsCancellationRequested)
                {
                    using var scope = _serviceProvider.CreateScope();
                    var messages = await _computations.GetInComingEventsAsync(
                                                    scope,
                                                    (incomingEvent) => !incomingEvent.IsDeleted &&
                                                                        !incomingEvent.WasAcknowledge &&
                                                                        incomingEvent.WasProcessed,
                                                    token).ConfigureAwait(false);
                    if (messages != null)
                    {
                        foreach (var msg in messages)
                        {
                            _ackMessages.TryAdd(msg);
                        }
                    }
                    await Task.Delay(TimeSpan.FromMinutes(5));
                }
            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
            }
            finally
            {
                tokenSource.Dispose();
            }
        }

        private async Task UpdateEventStatus(CancellationToken cancellationToken)
        {
            var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = cancellationTokenSource.Token;
            try
            {
                while (!token.IsCancellationRequested)
                {
                    while (_persistMessages.TryTakeAllWithoutDuplicate(out var computedMessages, token))
                    {
                        foreach (var computedMessage in computedMessages)
                        {
                            try
                            {
                                await _computations.UpdateEventStatus(_serviceProvider,
                                                            computedMessage.EventEntity,
                                                            computedMessage.PropertyToUpdate,
                                                            token)
                                    .ConfigureAwait(false);
                            }
                            catch (OperationCanceledException) { throw; }
                            catch (DbUpdateException) { throw; }
                            catch (NotSupportedException) { throw; }
                            catch (ObjectDisposedException) { throw; }
                            catch (InvalidOperationException) { throw; }
                            catch (Exception e)
                            {
                                e.LogException(_logger.LogCritical);
                                await Task.Delay(1000, token).ConfigureAwait(false);
                            }
                        }
                        await Task.Delay(TimeSpan.FromSeconds(30), token).ConfigureAwait(false);
                    }
                    await Task.Delay(TimeSpan.FromSeconds(30), token).ConfigureAwait(false);
                }
            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
            }
            finally
            {
                cancellationTokenSource.Dispose();
            }
        }

        private async Task ComputeReceivedMessage(CancellationToken cancellationToken)
        {
            var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = cancellationTokenSource.Token;

            try
            {
                while (!token.IsCancellationRequested)
                {
                    while (_inComingEvents.TryTakeAllWithoutDuplicate(out var messages, token))
                    {
                        token.ThrowIfCancellationRequested();
                        await Parallel.ForEachAsync(messages, token, async (message, token) =>
                        //foreach (var message in messages)
                        {
                            EncryptedMessage? encryptedMessage = null;
                            try
                            {
                                encryptedMessage = JsonSerializer.Deserialize<EncryptedMessage>(message.Body);
                            }
                            catch (Exception e)
                            {
                                e.LogException(_logger.LogCritical);
                                _incompatibleMessages.TryAdd(new InCompatibleMessage()
                                {
                                    OriginalMessageKey = message.MessageKey,
                                    Id = message.Id,
                                    EncryptedContent = message.Body,
                                    OriginalType = message.Type,
                                    InCompatibleType = nameof(EncryptedMessage)
                                });
                                return;
                            }

                            (bool wasEncrypted, List<KeyValuePair<string, string>> externalMsg, EncryptionDecryptionFail reason) decryptorResult;
                            try
                            {
                                decryptorResult = await _cryptoService.GetDecryptedExternalMessage(encryptedMessage!,
                                                                                                    cancellationToken)
                                                                        .ConfigureAwait(false);
                                if (decryptorResult.wasEncrypted)
                                {
                                    using var scope = _serviceProvider.CreateScope();
                                    await _computeExternalMessage.ProcessExternalMessage(decryptorResult.externalMsg!, token).ConfigureAwait(false);
                                    message.WasProcessed = true;
                                    var computedMessage = new ComputedMessage() { EventEntity = message, PropertyToUpdate = (msg) => msg.WasProcessed };
                                    _persistMessages.TryAdd(computedMessage);
                                    _ackMessages.TryAdd(message);
                                }
                            }
                            catch (OperationCanceledException) { throw; }
                            catch (Exception e)
                            {
                                e.LogException(_logger.LogCritical);
                            }
                        });
                        await Task.Delay(1000, token).ConfigureAwait(false);
                    }
                    await Task.Delay(1000, token).ConfigureAwait(false);
                }
            }
            catch (Exception e)
            {
                e.LogException(_logger.LogCritical);
            }
            finally
            {
                cancellationTokenSource.Dispose();
            }
        }

        /// <summary>
        /// It will query the table on a delayed time as event entity are added into the realtime computation as they arrive.
        /// It serve as gurantee to not miss event.
        /// It might produce duplicate, the message realtime de duplicate message as they are being retrieved.
        /// </summary>
        /// <param name="token"></param>
        /// <returns></returns>
        private Task RetrieveInComingEventEntityFromDatabase(CancellationToken cancellationToken)
        {
            var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = tokenSource.Token;
            return Task.Run(async () =>
            {
                try
                {
                    while (!token.IsCancellationRequested)
                    {
                        using var scope = _serviceProvider.CreateScope();
                        await _computations.GetInComingEventsAsync(
                                    scope,
                                    (eventEntity) => !eventEntity.IsDeleted &&
                                                        !eventEntity.WasAcknowledge &&
                                                        !eventEntity.WasProcessed,
                                    token).ConfigureAwait(false);
                        await Task.Delay(TimeSpan.FromMinutes(5), token).ConfigureAwait(false);
                    }

                }
                catch (Exception e)
                {
                    e.LogException(_logger.LogCritical);
                }
                finally
                {
                    tokenSource.Dispose();
                }
            }, token);
        }

        /// <summary>
        /// Retrieve message from azure service bus and save them in the database table incoming event entity, 
        /// also add the event into the realtime computation
        /// Use token cancellation source to stop down the stream tasks
        /// 
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        private Task ReceiveMessages(CancellationToken cancellationToken)
        {
            CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            var token = tokenSource.Token;
            return Task.Run(async () =>
            {       
                try
                {
                    var settings = _messagingOptions.Value.Where(item => item.ConfigType ==
                                                            AzureMessageSettingsOptionType.Receiver)
                                                    .ToList();
                    var tasks = new List<Task>();
                    foreach (var msgSettings in settings)
                    {   
                        foreach (var msgInTransitOptions in msgSettings.MessageInTransitOptions)
                        {
                            var task = Task.Run(async () =>
                            {
                                await _messageBusReceiver.ReceiveMessages(msgInTransitOptions.MsgQueueName,
                                    async (extMsg) =>
                                    {
                                        using var scope = _serviceProvider.CreateScope();
                                        var msg = extMsg.ConvertToInComingEventEntity();
                                        await _computations.SaveInComingEventEntity(scope, msg, token).ConfigureAwait(false);
                                        _inComingEvents.TryAdd(msg);
                                        return true;
                                    }, token).ConfigureAwait(false);
                            }, token);
                            task.ConfigureAwait(false);
                            tasks.Add(task);
                        }
                    }
                    await Task.WhenAll(tasks);
                }
                catch (Exception e)
                {
                    e.LogException(_logger.LogCritical);
                }
                finally
                {
                    tokenSource.Dispose();
                }
            }, token);
        }
    }
}
        

A message is stored on an in Memory Collection that is thread safe and provides deduplicate capability for long running process out of order.

Azure ServiceBus message sender source code:

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Sample.Sdk.Data.Msg;
using System.Text.Json;
using static Sample.Sdk.Core.Extensions.AggregateExceptionExtensions;
using Sample.Sdk.Data.Options;
using Sample.Sdk.Interface.Msg;
using static Sample.Sdk.Data.Enums.Enums;

namespace Sample.Sdk.Core.Msg
{
    /// <summary>
    /// Message sender implementation using azure service bus
    /// </summary>
    public partial class ServiceBusMessageSender : ServiceBusFactory, IMessageSender
    {
        private class Message
        {
            public string EndpointAndQueue { get; init; } = string.Empty;
            public List<ExternalMessage> ExternalMessages { get; init; } = new List<ExternalMessage>();
        }
        private ILogger<ServiceBusMessageSender> _logger;
        public ServiceBusMessageSender(ILoggerFactory loggerFactory
            , IOptions<List<AzureMessageSettingsOptions>> serviceBusInfoOptions
            , ServiceBusClient serviceBusClient) :
            base(serviceBusInfoOptions
                , serviceBusClient)
        {
            _logger = loggerFactory.CreateLogger<ServiceBusMessageSender>();
        }


        /// <summary>
        /// Send messages
        /// </summary>
        /// <param name="token">Cancellation token</param>
        /// <param name="messages">List of messages to send</param>
        /// <param name="onSent">Invoked per message successful sent</param>
        /// <returns cref="int">Amount of successful sent messages</returns>
        public async Task<(bool WasSent, SendFailedReason Reason)>
            SendMessage(CancellationToken token,
                    ExternalMessage msg,
                    Action<ExternalMessage> onSent,
                    Action<ExternalMessage, SendFailedReason?, Exception?> onError)
        {
            var sender = GetSender(msg.MsgQueueName, msg.MsgQueueEndpoint);
            if (sender == null)
            {
                onError?.Invoke(msg, SendFailedReason.InValidSenderEndpoint | SendFailedReason.InValidQueueName, null);
                return (false, SendFailedReason.InValidQueueName | SendFailedReason.InValidSenderEndpoint);
            }
            try
            {
                token.ThrowIfCancellationRequested();
                var serviceBusMsg = new ServiceBusMessage()
                {
                    ContentType = MsgContentType,
                    MessageId = msg.EntityId,
                    CorrelationId = msg.CorrelationId,
                    Body = new BinaryData(JsonSerializer.Serialize(msg))
                };
                await sender.SendMessageAsync(serviceBusMsg, token).ConfigureAwait(false);
                onSent?.Invoke(msg);
            }
            catch (Exception e)
            {
                onError?.Invoke(msg, default, e);
                e.LogException(_logger.LogCritical);
            }
            return (true, default);
        }

        /// <summary>
        /// Send messages. All messages would be send to the same queue.
        /// </summary>
        /// <param name="messages"></param>
        /// <param name="onSent"></param>
        /// <param name="onError"></param>
        /// <param name="token"></param>
        /// <returns></returns>
        public async Task<bool> SendMessages(Func<ExternalMessage, string> getQueue,
            IEnumerable<ExternalMessage> messages,
            Action<IEnumerable<ExternalMessage>> onSent,
            Action<IEnumerable<ExternalMessage>, Exception> onError,
            CancellationToken token)
        {
            var queueGrouped = from message in messages
                               let queueName = $"{message.MsgQueueEndpoint}{message.MsgQueueName}"
                               group message by queueName into groupedMessages
                               select new Message()
                               {
                                   EndpointAndQueue = groupedMessages.Key,
                                   ExternalMessages = groupedMessages.ToList()
                               };

            foreach (var message in queueGrouped)
            {
                if (!message.ExternalMessages.Any())
                    continue;
                do
                {
                    var counter = 0;
                    var toSend = message.ExternalMessages.TakeWhile(msg =>
                    {
                        if (counter >= 100)
                            return false;
                        counter++;
                        return true;
                    }).ToList();
                    var sender = GetSender(getQueue(toSend.First()), toSend.First().MsgQueueEndpoint);
                    if (sender == null)
                        throw new InvalidOperationException($"Sender not found for queue {getQueue(toSend.First())}");
                    try
                    {
                        var msgBatch = await sender!.CreateMessageBatchAsync(token).ConfigureAwait(false);
                        toSend.ToList().ForEach(msg =>
                        {
                            var serviceBusMsg = new ServiceBusMessage()
                            {
                                CorrelationId = msg.CorrelationId,
                                MessageId = msg.Id,
                                Body = new BinaryData(JsonSerializer.Serialize(msg))
                            };
                            msgBatch.TryAddMessage(serviceBusMsg);
                        });
                        await sender.SendMessagesAsync(msgBatch, token).ConfigureAwait(false);
                    }
                    catch (Exception exception)
                    {
                        onError(toSend, exception);
                    }
                    try
                    {
                        onSent(toSend);
                    }
                    catch (Exception exception)
                    {
                        exception.LogException(_logger.LogCritical);
                    }
                    message.ExternalMessages.RemoveRange(0, toSend.Count());
                }
                while (message.ExternalMessages.Any());
            }
            return true;
        }
    }
}
        

Azure ServiceBus message receiver source code:

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Sample.Sdk.Core.Extensions;
using Sample.Sdk.Data.Entities;
using Sample.Sdk.Data.Msg;
using Sample.Sdk.Data.Options;
using Sample.Sdk.Interface.Msg;
using System.Text;
using System.Text.Json;
using static Sample.Sdk.Core.Extensions.ExternalMessageExtensions;

namespace Sample.Sdk.Core.Msg
{
    public class ServiceBusMessageReceiver : ServiceBusFactory, IMessageReceiver
    {
        private readonly ILogger<ServiceBusMessageReceiver> _logger;
        public ServiceBusMessageReceiver(
            IOptions<List<AzureMessageSettingsOptions>> serviceBusInfoOptions
            , ServiceBusClient service
            , ILoggerFactory loggerFactory) :
            base(serviceBusInfoOptions
                , service)
        {
            _logger = loggerFactory.CreateLogger<ServiceBusMessageReceiver>();
        }

        /// <summary>
        /// Retrieve message from the acknowledgement queue
        /// </summary>
        /// <param name="queueName">Acknowsledgement queue name</param>
        /// <param name="messageProcessor">Process acknowledgement message</param>
        /// <param name="token">Cancel operation</param>
        /// <returns></returns>
        /// <exception cref="InvalidOperationException"></exception>
        /// <exception cref="ArgumentNullException"></exception>
        public async Task ReceiveMessages(string queueName,
            Func<ExternalMessage, Task<bool>> messageProcessor,
            CancellationToken token)
        {
            token.ThrowIfCancellationRequested();
            var serviceProcessor = GetServiceBusProcessor(queueName, () =>
            {
                return new ServiceBusProcessorOptions()
                {
                    AutoCompleteMessages = true,
                    ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
                    MaxConcurrentCalls = Environment.ProcessorCount
                };
            });
            serviceProcessor.ProcessMessageAsync += async (args) =>
            {
                var externalMsg = JsonSerializer.Deserialize<ExternalMessage>(Encoding.UTF8.GetString(args.Message.Body.ToArray()));
                if (externalMsg != null)
                    await messageProcessor.Invoke(externalMsg);
            };
            serviceProcessor.ProcessErrorAsync +=  (args) => 
            {
                args.Exception.LogException(_logger.LogCritical);
                return Task.CompletedTask;
            }; 
            await serviceProcessor.StartProcessingAsync(token).ConfigureAwait(false);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="token"></param>
        /// <param name="saveEntity"></param>
        /// <param name="queueName"></param>
        /// <returns></returns>
        /// <exception cref="InvalidOperationException"></exception>
        /// <exception cref="ApplicationException"></exception>
        public async Task<ExternalMessage> Receive(CancellationToken token
            , Func<InComingEventEntity, CancellationToken, Task<bool>> saveEntity
            , string queueName = "employeeadded")
        {
            var receiver = GetReceiver(queueName);
            if (receiver == null)
                throw new InvalidOperationException("Receiver not found");

            token.ThrowIfCancellationRequested();

            var message = await receiver.ReceiveMessageAsync(null, token);
            if (message == null)
            {
                return null;
            }
            if (message.ContentType != MsgContentType)
            {
                throw new ApplicationException("Invalid event content type");
            }
            var msgReceivedBytes = message.Body.ToMemory().ToArray();
            var receivedStringMsg = Encoding.UTF8.GetString(msgReceivedBytes);
            var externalMsg = JsonSerializer.Deserialize<ExternalMessage>(receivedStringMsg);
            if (externalMsg == null)
            {
                throw new ApplicationException("Invalid event message");
            }
            var inComingEvent = externalMsg.ConvertToInComingEventEntity();

            token.ThrowIfCancellationRequested();

            var result = await saveEntity(inComingEvent, token);
            if (!result)
            {
                await receiver.AbandonMessageAsync(message, null, token);
            }
            else
            {
                await receiver.CompleteMessageAsync(message, token);
            }
            return externalMsg;
        }
    }
}
        

Keeping track of duplicate message:

An in memory thread safe collection class for handling duplicate message. An asynchronous task read from the database to populate in memory collections with events to be send. An asynchronous task is responsible for reading from the in memory collection to dipath the events to the message broker:

Source code of the in memory collection:

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Sample.Sdk.Data;
using Sample.Sdk.Interface.Caching;
using System.Collections.Concurrent;

namespace Sample.Sdk.Core.Caching
{
    /// <summary>
    /// Use BlockingCollection to implement the producer consumer pattern in a thread-safe environment
    /// Use memory cache to keep track of item taked to avoid duplicate for 10 minutes
    /// </summary>
    /// <typeparam name="T">Is a class</typeparam>
    public class InMemoryDeDuplicateCache<T> : IInMemoryDeDuplicateCache<T> where T : Message
    {
        private readonly BlockingCollection<T> _state = new BlockingCollection<T>();
        private readonly IMemoryCacheState<string, string> _identifiersCache;
        private readonly ILogger<InMemoryDeDuplicateCache<T>> _logger;

        public int Count => _state.Count;

        public InMemoryDeDuplicateCache(
            IMemoryCacheState<string, string> identifiersCache,
            ILogger<InMemoryDeDuplicateCache<T>> logger)
        {
            _identifiersCache = identifiersCache;
            _logger = logger;
        }

        /// <summary>
        /// Thread-safe operation for adding object in a concurrent scneario of adding and removing items
        /// </summary>
        /// <param name="item">Item to add</param>
        /// <returns>True if the item was added otherwise false</returns>
        /// <exception cref="ArgumentNullException"></exception>
        /// <exception cref="InvalidOperationException"></exception>
        /// <exception cref="ObjectDisposedException"></exception>
        public bool TryAdd(T item)
        {
            if (item == null)
                throw new ArgumentNullException("item");
            return _state.TryAdd(item);
        }

        /// <summary>
        /// Thread-safe method to retrieve objects concurrently
        /// </summary>
        /// <param name="item">Object to add</param>
        /// <returns>True when item was retuened otherwise false</returns>
        /// <exception cref="InvalidOperationException"></exception>
        /// <exception cref="ObjectDisposedException"></exception>
        public bool TryTake(out T? item)
        {
            try
            {
                if (_state.TryTake(out item))
                {
                    return true;
                }
            }
            catch (Exception e)
            {
                _logger.LogCritical(e, "An error ocurred when taking an item from collection");
            }
            item = default;
            return false;
        }

        public bool TryTakeAll(out List<T?> items)
        {
            items = new List<T?>();
            while (TryTake(out T? item))
            {
                if (item != null)
                    items.Add(item);
            }
            return items.Any();
        }

        /// <summary>
        /// Call this method when processing a fail message and it will be retried again
        /// </summary>
        /// <param name="item">To be removed</param>
        /// <returns></returns>
        public bool TryAddAndRemoveCache(T? item)
        {
            if (item == null || _state == null)
                return false;
            try
            {
                _identifiersCache.Cache.Remove(item.Id);
                _state.Add(item);
                return true;
            }
            catch (Exception e)
            {
                _logger.LogCritical(e, "An error ocurred when adding and removing item to memory collection");
                return false;
            }
        }
        /// <summary>
        /// Loop taking item until an item is not found in cache or cancellation request is true.
        /// </summary>
        /// <param name="item">Item from collection</param>
        /// <param name="token">Cancellation token used to stop the loop</param>
        /// <returns></returns>
        public bool TryTakeDeDuplicate(out T? item, CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested && _state.TryTake(out item))
                {
                    if (!_identifiersCache.Cache.TryGetValue(item.Id, out string foundItem))
                    {
                        _identifiersCache.Cache.Set(
                            item.Id,
                            string.Empty,
                            absoluteExpiration: DateTimeOffset.UtcNow.AddDays(1));
                        return true;
                    }
                }
            }
            catch (Exception e)
            {
                _logger.LogCritical(e, "An error ocurred when pulling objects from in memory collection");
            }
            item = default;
            return false;
        }
        /// <summary>
        /// Use for loop
        /// </summary>
        /// <param name="pageSize">Return a enumerable limited to page size. When pageSize is equal or less than cero it return all items found</param>
        /// <param name="items">IEnumerable of items</param>
        /// <param name="token">Cancellation token</param>
        /// <returns></returns>
        public bool TryTakeAllWithoutDuplicate(out IList<T> items, CancellationToken token, int pageSize = 0)
        {
            int counter = 0;
            items = new List<T>();
            while ((counter < pageSize || pageSize <= 0) //page condition
                && TryTakeDeDuplicate(out var item, token))
            {
                items.Add(item!);
                if (!(pageSize <= 0))
                    counter++;
            }
            return items.Count > 0;
        }
    }
}
        


References:

Nick Chapsas created a solution using MassTransict libraries.

https://www.youtube.com/watch?v=032SfEBFIJs

Chris Richadson documented this pattern:

https://microservices.io/patterns/data/transactional-outbox.html




要查看或添加评论,请登录

Yusbel Garcia Diaz的更多文章

  • Class for Handling Duplicate Message

    Class for Handling Duplicate Message

    Derek Comartin wrote a post about duplicate messages, covering most cases. I wrote a code back in 2023 for handling…

    1 条评论
  • ChatGPT -> Analysis of My Resume

    ChatGPT -> Analysis of My Resume

    I requested ChatGPT a career and impact summary along with role and industry I should target. I like the response: Chat…

  • Data access

    Data access

    Back in 2023 (Off work) and followed on 2024, I wrote a repository class that encapsulate LINQ queries. While this…

    1 条评论
  • Mediator pattern

    Mediator pattern

    I have been criticized for implementing the mediator pattern in many companies. Everyone rightly want to use MediaR.

  • Open to work

    Open to work

    I have been looking for work since the end of Dec 2024. Since then, I started taking courses and applying for work.

  • Software design patterns

    Software design patterns

    A personal opinion on design patterns. When to use them? Most code base follow design patterns, even those that are…

  • About me

    About me

    A generic answer to questions about personality and point of view. I'm looking to raise my case to secure a job.

  • Career

    Career

    Brief People who have stepped out of the workforce face a tremendous challenge in getting back to work, regardless of…

    2 条评论
  • Calculate Min and Max

    Calculate Min and Max

    Given an array of integers and a k value to remove k numbers from the array return Min and Max. Example: arr = [1, 5…

  • Decouple components with domain events

    Decouple components with domain events

    A technical building block for decoupling components. Most applications are required to have independent, discrete…

社区洞察

其他会员也浏览了