Skip to the content.

RabbitMQ transport provider for SlimMessageBus

Please read the Introduction before reading this provider documentation.

Underlying client

The RabbitMQ transport provider uses RabbitMQ.Client client to connect to the RabbitMQ cluster via the AMQP protocol.

Concepts

The RabbitMQ and AMQP protocol introduce couple of concepts:

AMQP Concepts provides a brilliant overview.

Configuration

The RabbitMQ transport configuration is arranged via the .WithProviderRabbitMQ(cfg => {}) method on the message bus builder.

using SlimMessageBus.Host.RabbitMQ;

// Register SlimMessageBus in MSDI
services.AddSlimMessageBus((mbb) =>
{
    // Use RabbitMQ transport provider
    mbb.WithProviderRabbitMQ(cfg =>
    {
        // Connect using AMQP URI
        cfg.ConnectionString = configuration["RabbitMQ:ConnectionString"];

        // Alternatively, when not using AMQP URI:
        // cfg.ConnectionFactory.HostName = "..."
        // cfg.ConnectionFactory.VirtualHost = "..."
        // cfg.ConnectionFactory.UserName = "..."
        // cfg.ConnectionFactory.Password = "..."
        // cfg.ConnectionFactory.Ssl.Enabled = true

        // Fine tune the underlying RabbitMQ.Client:
        // cfg.ConnectionFactory.ClientProvidedName = $"MyService_{Environment.MachineName}";
    });

    mbb.AddServicesFromAssemblyContaining<PingConsumer>();
    mbb.AddJsonSerializer();
});

The relevant elements of the cfg:

Producers

Producers need to declare the exchange name and type the message should be delivered to. SMB will provision the specified exchange. Additionally, we can specify:

mbb.Produce<OrderEvent>(x => x
        // Will declare an orders exchange of type Fanout
        .Exchange("orders", exchangeType: ExchangeType.Fanout)
        // Will use a routing key provider that for a given message will take it's Id field
        .RoutingKeyProvider((m, p) => m.Id.ToString())
        // Will use
        .MessagePropertiesModifier((m, p) =>
        {
            p.MessageId = GetMessageId(m);
        }));

We can also set defaults for all producers on the bus level:

services.AddSlimMessageBus((mbb) =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        // All exchanges declared on producers will be durable by default
        cfg.UseExchangeDefaults(durable: true);

        // All messages will get the ContentType message property assigned
        cfg.UseMessagePropertiesModifier((m, p) =>
        {
            p.ContentType = MediaTypeNames.Application.Json;
        });
    });

    mbb.AddJsonSerializer();
});

Consumers

Consumers need to specify the queue name from which the consumer should be reading from. SMB will provision the specified queue. Additionally,

mbb.Consume<PingMessage>(x => x
    // Use the subscriber queue, do not auto delete
    .Queue("subscriber", autoDelete: false)
    //
    .ExchangeBinding("ping")
    // The queue declaration in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be created
    .DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct)
    .WithConsumer<PingConsumer>());

We can specify defaults for all consumers on the bus level:

 services.AddSlimMessageBus((mbb) =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        cfg.UseDeadLetterExchangeDefaults(durable: false, autoDelete: false, exchangeType: ExchangeType.Direct, routingKey: string.Empty);
        cfg.UseQueueDefaults(durable: false);
    });
});

Acknowledgment Mode

When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB:

In SMB we can set the acknowledgment mode for each consumer:

builder.Consume<PingMessage>(x => x
    .Queue("subscriber", autoDelete: false)
    .ExchangeBinding(topic)
    // Set the acknowledgement mode, the ConfirmAfterMessageProcessingWhenNoManualConfirmMade is the default
    .AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
    .WithConsumer<PingConsumer>());

Alternatively, a bus wide default can be specified for all consumers:

services.AddSlimMessageBus((mbb) =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        // Set the acknowledgement mode, the ConfirmAfterMessageProcessingWhenNoManualConfirmMade is the default
        cfg.AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade);
    });
});

See the RabbitMqMessageAcknowledgementMode has the available options.

By default (ConfirmAfterMessageProcessingWhenNoManualConfirmMade), messages are acknowledge (Ack) after the message processing finish with success. If an exception where to happen the message is rejected (Nack) (or else whatever the custom error handler logic does). In that default mode, the user can still Ack or Nack the message depending on the need inside of the consumer or interceptor using the Ack() / Nack() methods exposed on the ConsumerContext - this allows for manual acknowledgements. The default setting is optimal and safe. However, message retries could happen (at-least-once delivery).

The other acknowledgement modes will ack the message before processing, but are less safe as it will lead to at-most-once delivery.

Consumer Error Handling

By default the the transport implementation performs a negative ack (nack) in the AMQP protocol for any message that failed in the consumer. As a result the message will be marked as failed and routed to an dead letter exchange or discarded by the RabbitMQ broker.

The recommendation here is to either:

Dead Letter Exchange

The Dead Letter Exchanges is a feature of RabbitMQ that will forward failed messages from a particular queue to a special exchange.

In SMB on the consumer declaration we can specify which dead letter exchange should be used:

mbb.Consume<PingMessage>(x => x
    .Queue("subscriber", autoDelete: false)
    .ExchangeBinding(topic)
    // The queue provisioned in RabbitMQ will have a reference to the dead letter exchange
    .DeadLetterExchange("subscriber-dlq")
    .WithConsumer<PingConsumer>());

However, the subscriber-dlq will not be created by SMB in the sample. For it to be created the ExchangeType has to be specified, so that SMB knows what exchange type should it apply. It can be specified on the consumer:

mbb.Consume<PingMessage>(x => x
    .Queue("subscriber", autoDelete: false)
    .ExchangeBinding(topic)
    // The queue provisioned in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be provisioned
    .DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct)
    .WithConsumer<PingConsumer>());

Alternatively, a bus wide default can be specified for all dead letter exchanges:

services.AddSlimMessageBus((mbb) =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        // All the declared dead letter exchanges on the consumers will be of Direct type
        cfg.UseDeadLetterExchangeDefaults(durable: false, autoDelete: false, exchangeType: ExchangeType.Direct, routingKey: string.Empty);
    });
});
Custom Consumer Error Handler

Define a custom class implementation of IRabbitMqConsumerErrorHandler<>:

public class CustomRabbitMqConsumerErrorHandler<T> : IRabbitMqConsumerErrorHandler<T>
{
    // Inject needed dependencies via construction

    public async Task<bool> OnHandleError(T message, Func<Task> retry, IConsumerContext consumerContext, Exception exception)
    {
        // Check if this is consumer context for RabbitMQ
        var isRabbitMqContext = consumerContext.GetTransportMessage() != null;
        if (isRabbitMqContext)
        {
            if (exception is TransientException)
            {
                // Send negative acknowledge but ask the broker to retry
                consumerContext.NackWithRequeue();
            }
            else
            {
                // Send negative acknowledge (if dead letter setup it will be routed to it)
                consumerContext.Nack();
            }
            // Mark that the errored message was handled
            return true;
        }
        return false;
    }
}

Then register the implementation in MSDI for all (or specified) message types.

// Register error handler in MSDI for any message type
services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabbitMqConsumerErrorHandler<>));

When error handler is not found in the DI for the given message type, or it returns false, then default error handling will be applied.

See also the common error handling.

Consumer Concurrency Level

By default each consumer in the service process will handle one message at the same time. In order to increase the desired concurrency, set the ConsumerDispatchConcurrency to a value greater than 1. This is a setting from the underlying RabbitMQ driver that SMB uses.

services.AddSlimMessageBus((mbb) =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        cfg.ConnectionFactory.ConsumerDispatchConcurrency = 2; // default is 1
        // ...
    }
}

Notice that increasing concurrency will cause more messages to be processed at the same time within one service instance, hence affecting order of consumption. In scenarios where order of consumption is important, you may want to keep concurrency levels set to 1.

Request-Response

Here is an example how to set-up request-response flow over RabbitMQ. The fanout exchange types was used, but other type could be used as well (altough we might have to provide the routing key provider on the producer side.)

services.AddSlimMessageBus((mbb) =>
{
    // ...
    mbb.Produce<EchoRequest>(x =>
    {
        // The requests should be send to "test-echo" exchange
        x.Exchange("test-echo", exchangeType: ExchangeType.Fanout);
    })
    .Handle<EchoRequest, EchoResponse>(x => x
        // Declare the queue for the handler
        .Queue("echo-request-handler")
        // Bind the queue to the "test-echo" exchange
        .ExchangeBinding("test-echo")
        // If the request handling fails, the failed messages will be routed to the DLQ exchange
        .DeadLetterExchange("echo-request-handler-dlq")
        .WithHandler<EchoRequestHandler>())
    .ExpectRequestResponses(x =>
    {
        // Tell the handler to which exchange send the responses to
        x.ReplyToExchange("test-echo-resp", ExchangeType.Fanout);
        // Which queue to use to read responses from
        x.Queue("test-echo-resp-queue");
        // Bind to the reply to exchange
        x.ExchangeBinding();
        // Timeout if the response doesn't arrive within 60 seconds
        x.DefaultTimeout(TimeSpan.FromSeconds(60));
    });
});

Topology Provisioning

SMB automatically creates exchanges from producers, queues, dead letter exchanges and bindings from consumers.

However, if you need to layer on other topology elements (or peform cleanup) this could be achieved with UseTopologyInitializer():

services.AddSlimMessageBus((mbb) =>
{
    mbb.WithProviderRabbitMQ(cfg =>
    {
        cfg.UseTopologyInitializer((channel, applyDefaultTopology) =>
        {
            // perform some cleanup if needed
            channel.QueueDelete("subscriber-0", ifUnused: true, ifEmpty: false);
            channel.QueueDelete("subscriber-1", ifUnused: true, ifEmpty: false);
            channel.ExchangeDelete("test-ping", ifUnused: true);
            channel.ExchangeDelete("subscriber-dlq", ifUnused: true);

            // apply default SMB infered topology
            applyDefaultTopology();
        });
    });
});

Avoiding the call applyDefaultTopology() will suppress the SMB inferred topology creation. This might be useful in case the SMB inferred topology is not desired or there are other custom needs.

Not Supported

Feedback

Open a github issue if you need a feature, have a suggestion for improvement, or want to contribute an enhancement.