SlimMessageBus
SlimMessageBus is a client façade for message brokers for .NET. It comes with implementations for specific brokers (RabbitMQ, Kafka, Azure EventHub, MQTT, Redis Pub/Sub) and in-memory message passing (in-process communication). SlimMessageBus additionally provides request-response implementation over message queues.
The v2 release is available (see migration guide). The v3 release is under construction.
- Key elements of SlimMessageBus
- Docs
- Packages
- Samples
- Features
- Principles
- License
- Build
- Testing
- Credits
Key elements of SlimMessageBus
- Consumers:
IConsumer<in TMessage>
- subscriber in pub/sub (or queue consumer)IRequestHandler<in TRequest, TResponse>
&IRequestHandler<in TRequest>
- request handler in request-response
- Producers:
IPublishBus
- publisher in pub/sub (or queue producer)IRequestResponseBus
- sender in req/respIMessageBus
- extendsIPublishBus
andIRequestResponseBus
- Misc:
IRequest<out TResponse>
&IRequest
- a marker for request messagesMessageBus
- static accessor for current contextIMessageBus
Docs
- Introduction
- Providers:
- Plugins:
Packages
Name | Description | NuGet |
---|---|---|
SlimMessageBus |
The core API for SlimMessageBus | |
Transport providers | ||
.Host.AzureEventHub |
Transport provider for Azure Event Hubs | |
.Host.AzureServiceBus |
Transport provider for Azure Service Bus | |
.Host.Kafka |
Transport provider for Apache Kafka | |
.Host.Memory |
Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | |
.Host.MQTT |
Transport provider for MQTT | |
.Host.NATS |
Transport provider for NATS | |
.Host.RabbitMQ |
Transport provider for RabbitMQ | |
.Host.Redis |
Transport provider for Redis | |
.Host.Sql (pending) |
Transport provider implementation for SQL database message passing | |
Serialization | ||
.Host.Serialization.Json |
Serialization plugin for JSON (Newtonsoft.Json library) | |
.Host.Serialization.SystemTextJson |
Serialization plugin for JSON (System.Text.Json library) | |
.Host.Serialization.Avro |
Serialization plugin for Avro (Apache.Avro library) | |
.Host.Serialization.Hybrid |
Plugin that delegates serialization to other serializers based on message type | |
.Host.Serialization.GoogleProtobuf |
Serialization plugin for Google Protobuf | |
Plugins | ||
.Host.AspNetCore |
Integration for ASP.NET Core | |
.Host.Interceptor |
Core interface for interceptors | |
.Host.FluentValidation |
Validation for messages based on FluentValidation | |
.Host.Outbox.Sql |
Transactional Outbox using SQL | |
.Host.Outbox.DbContext |
Transactional Outbox using EF DbContext | |
.Host.AsyncApi |
AsyncAPI specification generation via Saunter |
Typically the application layers (domain model, business logic) only need to depend on SlimMessageBus
which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (SlimMessageBus.Host.*
) which are the messaging transport providers and additional plugins.
Samples
Basic usage
Some service (or domain layer) publishes a message:
IMessageBus bus; // injected
await bus.Publish(new SomeMessage());
Another service (or application layer) handles the message:
public class SomeMessageConsumer : IConsumer<SomeMessage>
{
public async Task OnHandle(SomeMessage message)
{
// handle the message
}
}
Note: It is also possible to avoid having to implement the interface
IConsumer<T>
(see here).
The bus also supports request-response implemented via queues, topics or in-memory - depending on the chosen transport provider. The sender side sends a request message:
var response = await bus.Send(new SomeRequest());
Note: It is possible to configure the bus to timeout a request when the response does not arrive within the allotted time (see here).
The receiving side handles the request and replies:
public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
public async Task<SomeResponse> OnHandle(SomeRequest request)
{
// handle the request message and return a response
return new SomeResponse { /* ... */ };
}
}
The bus will ask the DI container to provide the consumer instances (SomeMessageConsumer
, SomeRequestHandler
).
There is also support for one-way request-response.
Configuration
The Microsoft.Extensions.DependencyInjection
is used to compose the bus:
// IServiceCollection services;
services.AddSlimMessageBus(mbb =>
{
mbb
// First child bus - in this example Kafka transport
.AddChildBus("Bus1", (builder) =>
{
builder
.Produce<SomeMessage>(x => x.DefaultTopic("some-topic"))
.Consume<SomeMessage>(x => x.Topic("some-topic")
//.WithConsumer<SomeMessageConsumer>() // Optional: can be skipped as IConsumer<SomeMessage> will be resolved from DI
//.KafkaGroup("some-kafka-consumer-group") // Kafka: Consumer Group
//.SubscriptionName("some-azure-sb-topic-subscription") // Azure ServiceBus: Subscription Name
);
// ...
// Use Kafka transport provider (requires SlimMessageBus.Host.Kafka package)
.WithProviderKafka(cfg => { cfg.BrokerList = "localhost:9092"; }); // requires SlimMessageBus.Host.Kafka package
// Use Azure Service Bus transport provider
//.WithProviderServiceBus(cfg => { ... }) // requires SlimMessageBus.Host.AzureServiceBus package
// Use Azure Event Hub transport provider
//.WithProviderEventHub(cfg => { ... }) // requires SlimMessageBus.Host.AzureEventHub package
// Use Redis transport provider
//.WithProviderRedis(cfg => { ... }) // requires SlimMessageBus.Host.Redis package
// Use RabbitMQ transport provider
//.WithProviderRabbitMQ(cfg => { ... }) // requires SlimMessageBus.Host.RabbitMQ package
// Use in-memory transport provider
//.WithProviderMemory(cfg => { ... }) // requires SlimMessageBus.Host.Memory package
})
// Add other bus transports (as child bus), if needed
//.AddChildBus("Bus2", (builder) => { })
// Scan assembly for consumers, handlers, interceptors, and register into MSDI
.AddServicesFromAssemblyContaining<SomeMessageConsumer>()
//.AddServicesFromAssembly(Assembly.GetExecutingAssembly());
// Add JSON serializer
.AddJsonSerializer(); // requires SlimMessageBus.Host.Serialization.Json or SlimMessageBus.Host.Serialization.SystemTextJson package
});
The configuration can be modularized.
Use Case: Domain Events (in-process pub/sub messaging)
This example shows how SlimMessageBus
and SlimMessageBus.Host.Memory
can be used to implement the Domain Events pattern.
The provider passes messages in the same process (no external message broker is required).
The domain event is a simple POCO:
// domain event
public record OrderSubmittedEvent(Order Order, DateTime Timestamp);
The domain event handler implements the IConsumer<T>
interface:
// domain event handler
public class OrderSubmittedHandler : IConsumer<OrderSubmittedEvent>
{
public Task OnHandle(OrderSubmittedEvent e)
{
// ...
}
}
The domain event handler (consumer) is obtained from the MSDI at the time of event publication. The event publish enlists in the ongoing scope (web request scope, external message scope of the ongoing message).
In the domain model layer, the domain event gets raised:
// aggregate root
public class Order
{
public Customer Customer { get; }
public OrderState State { get; private set; }
private IList<OrderLine> lines = new List<OrderLine>();
public IEnumerable<OrderLine> Lines => lines.AsEnumerable();
public Order(Customer customer)
{
Customer = customer;
State = OrderState.New;
}
public OrderLine Add(string productId, int quantity) { }
public Task Submit()
{
State = OrderState.Submitted;
// Raise domain event
return MessageBus.Current.Publish(new OrderSubmittedEvent(this));
}
}
Sample logic executed by the client of the domain model:
var john = new Customer("John", "Whick");
var order = new Order(john);
order.Add("id_machine_gun", 2);
order.Add("id_grenade", 4);
await order.Submit(); // events fired here
Notice the static MessageBus.Current
property is configured to resolve a scoped IMessageBus
instance (web request-scoped or pick-up message scope from a currently processed message).
The SlimMessageBus
configuration for the in-memory provider looks like this:
//IServiceCollection services;
// Configure the message bus
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderMemory();
// Find types that implement IConsumer<T> and IRequestHandler<T, R> and declare producers and consumers on the mbb
mbb.AutoDeclareFrom(Assembly.GetExecutingAssembly());
// Scan assembly for consumers, handlers, interceptors, and register into MSDI
mbb.AddServicesFromAssemblyContaining<OrderSubmittedHandler>();
});
For the ASP.NET project, set up the MessageBus.Current
helper (if you want to use it, and pick up the current web-request scope):
services.AddSlimMessageBus(mbb =>
{
// ...
mbb.AddAspNet(); // requires SlimMessageBus.Host.AspNetCore package
});
services.AddHttpContextAccessor(); // This is required by the SlimMessageBus.Host.AspNetCore plugin
See the complete sample for ASP.NET Core where the handler and bus are web-request scoped.
Use Case: MediatR replacement
The SlimMessageBus in-memory provider can replace the need to use MediatR library:
- It has similar semantics and has the interceptor pipeline enabling the addition of custom behavior.
- The generic interceptors can introduce common behavior like logging, authorization or audit of messages.
- The FluentValidation plugin can introduce request/command/query validation.
- The external communication can be layered on top of SlimMessageBus which allows having one library for in-memory and out-of-process messaging (Hybrid Provider).
See the CQRS and FluentValidation samples.
Use Case: Request-response over Kafka topics
See sample.
Features
- Types of messaging patterns supported:
- Publish-subscribe
- Request-response
- Queues
- A hybrid of the above (e.g. Kafka with multiple topic consumers in one group)
- Modern async/await syntax and TPL
- Fluent configuration
- SourceLink support
- Because SlimMessageBus is a facade, chosen messaging transports can be swapped without impacting the overall application architecture.
Principles
- The core of
SlimMessageBus
is “slim”- Simple, common and friendly API to work with messaging systems
- No external dependencies.
- The core interface can be used in the domain model (e.g. Domain Events)
- Plugin architecture:
- Message serialization (JSON, Avro, Protobuf)
- Use your favorite messaging broker as a provider by simply pulling a NuGet package
- Add transactional outbox pattern or message validation
- No threads created (pure TPL)
- Async/Await support
- Fluent configuration
- Logging is done via
Microsoft.Extensions.Logging.Abstractions
so that you can connect to your favorite logger provider.
License
Build
cd src
dotnet build
dotnet pack --output ../dist
NuGet packages end up in dist
folder
Testing
To run tests you need to update the secrets.txt
to match your cloud infrastructure or local infrastructure.
SMB has some message brokers set up on Azure for integration tests (secrets not shared).
Run all tests:
dotnet test
Run all tests except integration tests that require local/cloud infrastructure:
dotnet test --filter Category!=Integration
Credits
Thanks to Gravity9 for providing an Azure subscription that allows running the integration test infrastructure.
Thanks to the following service cloud providers for providing free instances for our integration tests:
- Redis - Redis Labs
- Kafka - CloudKarafka
- MQTT - HiveMQ
- RabbitMQ - CloudAMQP
If you want to help and sponsor, please write to me.