Skip to the content.

Transactional Outbox Plugin for SlimMessageBus

Please read the Introduction before reading this provider documentation.

Introduction

The Host.Outbox introduces Transactional Outbox pattern to the SlimMessageBus.

PostgreSQL

SQL server

Outbox plugin can work in combination with any transport provider.

Configuration

Entity Framework

PostgreSQL

Required: SlimMessageBus.Host.Outbox.PostgreSql.DbContext

using SlimMessageBus.Host.Outbox.PostgreSql.DbContext;

SQL Server

Required: SlimMessageBus.Host.Outbox.Sql.DbContext

using SlimMessageBus.Host.Outbox.Sql.DbContext;

Consider the following example (from Samples):

Startup setup:

builder.Services.AddSlimMessageBus(mbb =>
{
    mbb.PerMessageScopeEnabled(false);
    mbb
        .AddChildBus("Memory", mbb =>
        {
            mbb.WithProviderMemory()
                .AutoDeclareFrom(Assembly.GetExecutingAssembly(), consumerTypeFilter: t => t.Name.EndsWith("CommandHandler"));
            //.UseTransactionScope(messageTypeFilter: t => t.Name.EndsWith("Command")) // Consumers/Handlers will be wrapped in a TransactionScope
            //.UseSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command

            switch (dbProvider)
            {
                case DbProvider.SqlServer:
                    mbb.UseSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command
                    break;

                case DbProvider.PostgreSql:
                    mbb.UsePostgreSqlTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); // Consumers/Handlers will be wrapped in a SqlTransaction ending with Command
                    break;
            }
        })
        .AddChildBus("AzureSB", mbb =>
        {
            mbb
                .Handle<CreateCustomerCommand, Guid>(s =>
                {
                    s.Topic("samples.outbox/customer-events", t =>
                    {
                        t.WithHandler<CreateCustomerCommandHandler, CreateCustomerCommand>()
                            .SubscriptionName("CreateCustomer");
                    });
                })
                .WithProviderServiceBus(cfg =>
                {
                    cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
                    cfg.TopologyProvisioning.CanProducerCreateTopic = true;
                    cfg.TopologyProvisioning.CanConsumerCreateQueue = true;
                    cfg.TopologyProvisioning.CanConsumerReplaceSubscriptionFilters = true;
                })
                .Produce<CustomerCreatedEvent>(x =>
                {
                    x.DefaultTopic("samples.outbox/customer-events");
                    // OR if you want just this producer to sent via outbox
                    // x.UseOutbox();
                })
                // All outgoing messages from this bus will go out via an outbox
                .UseOutbox(/* messageTypeFilter: t => t.Name.EndsWith("Command") */); // Additionally, can apply filter do determine messages that should go out via outbox                
        })
        .AddServicesFromAssembly(Assembly.GetExecutingAssembly())
        .AddJsonSerializer()
        .AddAspNet();

    switch (dbProvider)
    {
        case DbProvider.SqlServer:
            SlimMessageBus.Host.Outbox.Sql.DbContext.MessageBusBuilderExtensions.AddOutboxUsingDbContext<CustomerContext>(mbb, opts =>
            {
                opts.PollBatchSize = 500;
                opts.PollIdleSleep = TimeSpan.FromSeconds(10);
                opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
                opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
                //opts.SqlSettings.TransactionIsolationLevel = System.Data.IsolationLevel.RepeatableRead;
                //opts.SqlSettings.Dialect = SqlDialect.SqlServer;
            });

            break;

        case DbProvider.PostgreSql:
            SlimMessageBus.Host.Outbox.PostgreSql.DbContext.MessageBusBuilderExtensions.AddOutboxUsingDbContext<CustomerContext>(mbb, opts =>
            {
                opts.PollBatchSize = 500;
                opts.PollIdleSleep = TimeSpan.FromSeconds(10);
                opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
                opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
                //opts.SqlSettings.TransactionIsolationLevel = System.Data.IsolationLevel.RepeatableRead;
                //opts.SqlSettings.Dialect = SqlDialect.SqlServer;
            });

            break;
    }
});

Command handler:

public record CreateCustomerCommandHandler(IMessageBus Bus, CustomerContext CustomerContext) : IRequestHandler<CreateCustomerCommand, Guid>
{
    public async Task<Guid> OnHandle(CreateCustomerCommand request, CancellationToken cancellationToken)
    {
        // Note: This handler will be already wrapped in a transaction: see Program.cs and .UseTransactionScope() / .UseSqlTransaction() 

        var customer = new Customer(request.Firstname, request.Lastname);
        await CustomerContext.Customers.AddAsync(customer);
        await CustomerContext.SaveChangesAsync();

        // Announce to anyone outside of this micro-service that a customer has been created (this will go out via an transactional outbox)
        await Bus.Publish(new CustomerCreatedEvent(customer.Id, customer.Firstname, customer.Lastname));

        return customer.Id;
    }
}

Direct Connection

PostgreSQL

Required: SlimMessageBus.Host.Outbox.PostgreSql

using SlimMessageBus.Host.Outbox.PostgreSql;

SQL Server

Required: SlimMessageBus.Host.Outbox.Sql

using SlimMessageBus.Host.Outbox.Sql;

Consider the following example:

builder.Services.AddSlimMessageBus(mbb =>
{
    // Alternatively, if we were not using EF, we could use a SqlConnection
    mbb.AddOutboxUsingSql(opts => { opts.PollBatchSize = 100; });
});

// SMB requires the SqlConnection to be registered in the container
builder.Services.AddTransient(svp =>
{
    var configuration = svp.GetRequiredService<IConfiguration>();
    var connectionString = configuration.GetConnectionString("DefaultConnection");
    return new SqlConnection(connectionString);
});

Options

UseOutbox for Producers

Required: SlimMessageBus.Host.Outbox

using SlimMessageBus.Host.Outbox;

.UseOutbox() can be used on producer declaration to require outgoing messages to use the outbox. When applied on the (child) bus level then all the producers will inherit that option.

Transactions for Consumers

Each consumer (or handler) can be placed inside of an SQL transaction. What that means is that when a consumer processes a message, an transaction will be started automatically by SMB, then if processing is successful that transaction will get committed. In the case of an error it will be rolled back.

The transactions can be nested. For example a consumer (e.g. Azure SB) invokes a command handler (e.g. Memory) and they both have transactions enabled, then the underlying transaction is committed when both consumers finish with success.

There are two types of transaction support:

UseTransactionScope

Required: SlimMessageBus.Host.Outbox

using SlimMessageBus.Host.Outbox;

.UseTransactionScope() can be used on consumers (or handlers) declaration to force the consumer to start a TransactionScope prior the message OnHandle and to complete that transaction after it. Any exception raised by the consumer would cause the transaction to be rolled back.

When applied on the (child) bus level then all consumers (or handlers) will inherit that option.

UsePostgreSqlTransaction

Required: SlimMessageBus.Host.Outbox.PostgreSql or SlimMessageBus.Host.Outbox.PostgreSql.DbContext

using SlimMessageBus.Host.Outbox.PostgreSql;

.UsePostgreSqlTransaction() can be used on consumers (or handlers) declaration to force the consumer to start a PostgreSqlTransaction prior the message OnHandle and to complete that transaction after it. Any exception raised by the consumer would cause the transaction to be rolled back.

When applied on the (child) bus level then all consumers (or handlers) will inherit that option.

PostgreSqlTransaction-s are created off the associated NpgsqlConnection.

UseSqlTransaction

Required: SlimMessageBus.Host.Outbox.Sql or SlimMessageBus.Host.Outbox.Sql.DbContext

using SlimMessageBus.Host.Outbox.Sql;

.UseSqlTransaction() can be used on consumers (or handlers) declaration to force the consumer to start a SqlTransaction prior the message OnHandle and to complete that transaction after it. Any exception raised by the consumer would cause the transaction to be rolled back.

When applied on the (child) bus level then all consumers (or handlers) will inherit that option.

SqlTransaction-s are created off the associated SqlConnection.

How it works

Clean up

On starting SMB, messages that are older than MessageCleanup.Age will be removed from the Outbox table in batches of MessageCleanup.BatchSize until no sent messages of the specified age remain. The process is then repeated every MessageCleanup.Interval period.

Property Description Default
Enabled True if messages are to be removed true
Interval Time between exections 1 hour
Age Minimum age of a sent message to delete 1 hour
BatchSize Number of messages to be removed in each iteration 10 000

Important note

As the outbox can be processed by instance of the application that did not originally process it, it is important to ensure that all active instances maintain the same message registrations (and compatible JSON schema definitions).

A message that fails to deserialize will be flagged as invalid by setting the associated DeliveryAborted field in the Outbox table, to 1. It is safe to manually reset this field value to 0 once the version incompatibility has been resolved.