Samples
These are some selected samples for SlimMessageBus.
Sample.Simple.ConsoleApp
This is the simplest, one-class code example. It shows how it is easy to change providers in one place while having the rest of the code intact.
Sample.DomainEvents
This sample shows how SlimMessageBus
can be used to implement domain events.
Sample.DomainEvents.Domain
project is the domain model that has the OrderSubmittedEvent
. The domain layer has a dependency on SlimMessageBus
to be able to publish domain events.
Sample.DomainEvents.Application
implements the application logic and has the handler OrderSubmittedHandler
for the domain event (which implements the IConsumer<OrderSubmittedEvent>
).
Sample.DomainEvents.WebApi
project is an ASP.NET Core 5.0 project that configures the SlimMessageBus.Host.Memory
to enable in-process message passing.
Notice that the MessageBus.Current
will resolve the IMessageBus
instance from the current web request scope. Each handler instance will be scoped to the web request as well.
The MessageBus instance is a web request scoped. The scope could as well be a singleton.
Run the WebApi project and POST (without any payload) to https://localhost:5001/api/orders
. An order will be submitted:
2018-12-09 23:06:34.4667|INFO|Sample.DomainEvents.Domain.OrderSubmittedHandler|Customer John Whick just placed an order for:
2018-12-09 23:06:34.4667|INFO|Sample.DomainEvents.Domain.OrderSubmittedHandler|- 2x id_machine_gun
2018-12-09 23:06:34.4749|INFO|Sample.DomainEvents.Domain.OrderSubmittedHandler|- 4x id_grenade
2018-12-09 23:06:34.4749|INFO|Sample.DomainEvents.Domain.OrderSubmittedHandler|Generating a shipping order...
Sample.Images
Use case:
- Some front-end web app needs to display downsized image (thumbnails) of large images to speed up the page load.
- The thumbnails are requested in the WebApi and are generated on demand (and cached to disk) by the Worker (unless they exist already).
- WebApi and Worker exchange messages via Apache Kafka
- Worker can be scaled out (more instances, more Kafka partitions)
The sample project uses request-response to generate image thumbnails. It consists of two main applications:
- WebApi (ASP.NET WebApi)
- Worker (.NET Console App)
The WebApi serves thumbnails of an original image given the desired Width x Height. To request a thumbnail of size 120x80
of the image DSC0843.jpg
use:
https://localhost:56788/api/image/DSC3781.jpg/r/?w=120&h=80&mode=1
The thumbnail generation happens on the Worker. Because the image resizing is an CPU/memory intensive operation, the number of workers can be scaled out as the load increases.
The original images and produced thumbnails cache reside on disk in the folder: .\SlimMessageBus\src\Samples\Content\
To obtain the original image use:
https://localhost:56788/api/image/DSC3781.jpg
When a thumbnail of the specified size already exists it will be served by WebApi, otherwise a request message is sent to Worker to perform processing. When the Worker generates the thumbnail it responds with a response message.
Sequence diagram
Key snippet
The front-end web app makes a call to resize an image DSC0862.jpg
to 120x80
resolution, by using this URL:
https://localhost:56788/api/image/DSC3781.jpg/r/?w=120&h=80&mode=1
This gets handled by the WebApi method of the ImageController
private readonly IRequestResponseBus _bus;
// ...
[Route("{fileId}")]
public async Task<HttpResponseMessage> GetImageThumbnail(string fileId, ThumbnailMode mode, int w, int h)
{
var thumbFileContent = // ... try to load content for the desired thumbnail w/h/mode/fileId
if (thumbFileContent == null)
{
// Task will await until response comes back (or timeout happens). The HTTP request will be queued and IIS processing thread released.
var thumbGenResponse = await _bus.Send(new GenerateThumbnailRequest(fileId, mode, w, h));
thumbFileContent = await _fileStore.GetFile(thumbGenResponse.FileId);
}
return ServeStream(thumbFileContent);
}
The GenerateThumbnailRequest
request is handled by a handler in one of the pool of Worker console apps.
public class GenerateThumbnailRequestHandler : IRequestHandler<GenerateThumbnailRequest, GenerateThumbnailResponse>
{
public Task<GenerateThumbnailResponse> OnHandle(GenerateThumbnailRequest request)
{
// some processing
return new GenerateThumbnailResponse { FileId = thumbnailFileId };
}
}
The response gets replied to the originating WebApi instance and the Task<GenerateThumbnailResponse>
resolves causing the queued HTTP request to serve the resized image thumbnail.
var thumbGenResponse = await _bus.Send(new GenerateThumbnailRequest(fileId, mode, w, h));
The message bus configuration for the WebApi:
services.AddSlimMessageBus((mbb, svp) =>
{
// unique id across instances of this application (e.g. 1, 2, 3)
var instanceId = Configuration["InstanceId"];
var kafkaBrokers = Configuration["Kafka:Brokers"];
var instanceGroup = $"webapi-{instanceId}";
var instanceReplyTo = $"webapi-{instanceId}-response";
mbb
.Produce<GenerateThumbnailRequest>(x =>
{
// Default response timeout for this request type
//x.DefaultTimeout(TimeSpan.FromSeconds(10));
x.DefaultTopic("thumbnail-generation");
})
.ExpectRequestResponses(x =>
{
x.ReplyToTopic(instanceReplyTo);
x.KafkaGroup(instanceGroup);
// Default global response timeout
x.DefaultTimeout(TimeSpan.FromSeconds(30));
})
.WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers));
});
});
services.AddMessageBusJsonSerializer(();
services.AddMessageBusServicesFromAssembly(Assembly.GetExecutingAssembly());
services.AddHttpContextAccessor(); // This is required for the SlimMessageBus.Host.AspNetCore plugin
The message bus configuration for the Worker:
// This sample uses Autofac
var builder = new ContainerBuilder();
services.AddSlimMessageBus((mbb, svp) =>
{
// unique id across instances of this application (e.g. 1, 2, 3)
var instanceId = configuration["InstanceId"];
var kafkaBrokers = configuration["Kafka:Brokers"];
var instanceGroup = $"worker-{instanceId}";
var sharedGroup = "workers";
mbb
.Handle<GenerateThumbnailRequest, GenerateThumbnailResponse>(s =>
{
s.Topic("thumbnail-generation", t =>
{
t.WithHandler<GenerateThumbnailRequestHandler>()
.KafkaGroup(sharedGroup)
.Instances(3);
});
})
.WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)
{
ConsumerConfig = (config) =>
{
config.StatisticsIntervalMs = 60000;
config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest;
}
});
});
Because topics are partitioned in Kafka, requests originating from WebApi instances will be distributed across all Worker instances. However, to fine tune this, message key providers should be configured (see Kafka provider wiki and samples).
Sample.Serialization
The Sample.Serialization.ConsoleApp is a simple console app that shows different serializer plugins and how to use them. Additionally, the Sample.Serialization.MessagesAvro project has a sample Avro IDL/Contract from which C# message DTOs are generated.