Skip to the content.

Azure Event Hub Provider for SlimMessageBus

Please read the Introduction before reading this provider documentation.

Configuration

Azure Event Hub provider requires a connection string to the event hub:

var connectionString = ""; // Azure Event Hubs connection string
var storageConnectionString = ""; // Azure Storage Account connection string (for the consumer group to store last checkpointed offset of each topic-partition)
var storageContainerName = ""; // Azure Blob Storage container name

services.AddSlimMessageBus(mbb =>
{
    // Use Azure Event Hub as provider    
    mbb.WithProviderEventHub(cfg =>
    {
        cfg.ConnectionString = eventHubConnectionString;
        cfg.StorageConnectionString = storageConnectionString;
        cfg.StorageBlobContainerName = storageContainerName;
    });    
    mbb.AddJsonSerializer();

    // ...
});

If your bus only produces messages to Event Hub and does not consume any messages, then you do not need to provide a storage account as part of the config. In such case, the storage account details can be omitted:

var connectionString = ""; // Azure Event Hubs connection string

services.AddSlimMessageBus(mbb =>
{
    mbb.WithProviderEventHub(cfg =>
    {
        cfg.ConnectionString = eventHubConnectionString;
        // The bus will only be used to publish messages to Azure Event Hub - no need to set the storage account details for consumer leases
        //cfg.StorageConnectionString = storageConnectionString;
        //cfg.StorageBlobContainerName = storageContainerName;
    });
    mbb.AddJsonSerializer();
})

The blob storage container will be created if it does not exist. Therefore, ensure the storage account connection string has sufficient permissions or create the storage container ahead of the application start.

Advanced settings

There are additional configuration options from the underlying AEH client available that can be used to further tweak the client behavior. Here is an example:

services.AddSlimMessageBus(mbb =>
{
    mbb.WithProviderEventHub(cfg =>
    {
        cfg.ConnectionString = eventHubConnectionString;
        cfg.StorageConnectionString = storageConnectionString;
        cfg.StorageBlobContainerName = storageContainerName;

        // More advanced settings can be changed on the underlying AEH client
        cfg.EventHubProducerClientOptionsFactory = (path) => new Azure.Messaging.EventHubs.Producer.EventHubProducerClientOptions
        {
            Identifier = $"MyService_{Guid.NewGuid()}"
        };
        
        cfg.EventHubProcessorClientOptionsFactory = (consumerParams) => new Azure.Messaging.EventHubs.EventProcessorClientOptions
        {
            // Force partition lease rebalancing to happen faster (if new consumers join they can quickly gain a partition lease)
            LoadBalancingUpdateInterval = TimeSpan.FromSeconds(2),
            PartitionOwnershipExpirationInterval = TimeSpan.FromSeconds(5),
        };
    });    
});

Producing Messages

To produce a given TMessage to an Azure Event Hub named my-event-hub use:

// send TMessage to Azure SB queues
mbb.Produce<TMessage>(x => x.DefaultPath("my-event-hub")); 

Selecting message partition

Azure EventHub topics are broken into partitions:

SMB Azure EventHub allows to set a provider (selector) that will assign the partition key for a given message. Here is an example:

mbb.Produce<CustomerUpdated>(x => 
{
    x.DefaultPath("topic1");
    // Message key could be set for the message
    x.KeyProvider((message) => message.CustomerId.ToString());
});

The partition key value is a string for AEH.

There is also an alias EhKeyProvider that might be useful in case the hybrid bus is used with multiple providers that might have an overlapping extension method name.

Consuming Messages

Azure Event Hub requires a consumer group name to be provided along with the event hub name:

mbb.Consume<SomeMessage>(x => x
    .Path(hubName) // hub name
    .Group(consumerGroupName) // consumer group name on the hub
    .WithConsumer<SomeConsumer>());

Checkpointing offsets

Azure Event Hub client needs to store the last offset for a partition / hub name / consumer group, so that when the app restarts it knows where to resume message consumption from. That is checkpointing. Here are some additional configuration options:

mbb.Consume<SomeMessage>(x => x
    .Path(hubName) // hub name
    .Group(consumerGroupName) // consumer group name on the hub
    .WithConsumer<SomeConsumer>()
    .CheckpointAfter(TimeSpan.FromSeconds(10)) // trigger checkpoint after 10 seconds 
    .CheckpointEvery(50)); // trigger checkpoint every 50 messages

When the service checkpoints are often, this will impact performance/throughput (more round trips to Azure Blob Storage to save the offsets). In contrast, when the service checkpoints are too rare, then the probability for a message retry (if the lease expires or your services crashes) increases. As with everything, this needs to be tweaked to achieve a balance.

Since version 1.16.0 the transport has moved to the new AEH client library. Because of this, the consumer offsets stored in Azure Blob Storage are no longer compatible. See the migration path from Microsoft here.