Welcome to KnightBus’s documentation!

KnightBus is a fast, lightweight and extensible messaging framework that supports multiple active messaging transports

When building BookBeat we soon discovered that there was no silver bullet messaging technology, each one had its own pros and cons. Reliability, performance, latency, scalability, pricing and capabilites made us build KnightBus so that we could choose transport on a per message basis.

Features:

  • Multiple Transports, active simultaneously on per message basis
  • Middleware, write your own middleware to implement custom features
  • Attachments, attach large files to your messages, transport independent
  • Singleton Processing, make sure only one message is processed at a time regardless of number of instances running
  • Throttling, both global and per message
  • IoC, bring you own or use the default

Quick Start

The goal with Knightbus was to build a fast and simple messaging library that supports having multiple active messaging transports at the same time. There are many messaging frameworks available, but most of them are very complex and only support one active message transport, forcing developers to choose a fit-all messaging stack.

Installation

Find the official KnightBus packages at NuGet.org : https://www.nuget.org/profiles/BookBeat

Processing Messages

public class CommandProcessor : IProcessCommand<SampleCommand, SampleSettings>,
{
    public CommandProcessor(ISomeDependency dependency)
    {
        //You can use bring your own container for dependency injection
    }

    public Task ProcessAsync(SampleCommand message, CancellationToken cancellationToken)
    {
        //Your code goes here
        return Task.CompletedTask;
    }
}

public class SampleCommand : IServiceBusCommand
{
    public string Message { get; set; }
}

public class SampleCommandMapping : IMessageMapping<SampleCommand>
{
    public string QueueName => "your-queue-name";
}

public class SampleSettings : IProcessingSettings
{
    //These settings can be re-used by multiple message processors
    public int MaxConcurrentCalls => 100; //How many concurrent messages do you want to process
    public TimeSpan MessageLockTimeout => TimeSpan.FromMinutes(1); //How long should you process before considering the message hung
    public int DeadLetterDeliveryLimit => 1; //How many retries before dead lettering
    public int PrefetchCount => 50; //Increase perfomance by fetching more messages at once
}

Sending Messages

var client = new ServiceBus(new ServiceBusConfiguration(serviceBusConnection));
//Send some Commands
for (var i = 0; i < 10; i++)
{
    await client.SendAsync(new SampleCommand { Message = "Hello from message " + i.ToString() });
}

Examples

You can find all current examples at our GitHub repository https://github.com/BookBeat/knightbus/tree/master/knightbus/examples

KnightBus Host

The KnightBus host is the part of KnightBus responsible for invoking your MessageProcessors with the message sent on the transport. All configuration options for the host is exposed through the IHostConfiguration property from Configure()

Middlewares

Middlewares enable you to easily control what happens when processing messages. Much of KnightBus’s internal mechanisms are implemented as Middlewares. Middlewares can be attached at either host or transport level. A host attached Middleware will be invoked for all transports where as a transport attached middleware will be specific for the transport only.

Default Middlewares

  • ErrorHandlingMiddleware - Makes sure all exceptions are caught, logged and that the message is marked as failed. This Middleware is always added as the first Middleware.
  • DeadLetterMiddleware - Dead letters messages that have failed to many times.

Optional Included Middlewares

  • ThrottlingMiddleware - Allows throttling number of concurrent messages.
  • AttachmentMiddleware - Enables the use of message attachments.
  • ExtendMessageLockDurationMiddleware - Enables automatic message lock extension. Useful for very long running messages where you don’t want to take an extremly long lock directly on the transport. Currently only works with Azure Storage Queues. Enable by having your ProcessingSetting implement IExtendMessageLockTimeout.

Pipeline

All Middlewares are executed in a Pipeline where the first component always is the ErrorHandlingMiddleware and the last is your actual implementation of the MessageProcessor

Since the Middlewares are executed in order, it is important to supply them in the order you want to execute them.

Writing your own Middleware

It’s really easy to write your own custom Middleware, just implement the IMessageProcessorMiddleware and add the Middleware to the Pipeline on either host or transport level. Custom logging and performance monitoring are obvious use-cases.

public class MyCustomMiddleware : IMessageProcessorMiddleware
{
    public async Task ProcessAsync<T>(IMessageStateHandler<T> messageStateHandler, IPipelineInformation pipelineInformation, IMessageProcessor next, CancellationToken cancellationToken) where T : class, IMessage
    {
        Console.WriteLine("This is before the next step in the Pipeline");
        await next.ProcessAsync(messageStateHandler, cancellationToken).ConfigureAwait(false); //Call the next Middleware in the Pipeline
        Console.WriteLine("This is after all later Middlewares have finished");
    }
}

Logging

You can bring your own logging framwork or use the supplied Microsoft.Abstractions implementation.

Dependency Injection

KnightBus supports using your own IoC container or you can use the supplied Microsoft.Abstractions implementation.

Messages are lifestyle scoped per-message.

Singleton Locks

MessageProcessors can be marked as Singleton. This is done using the marker interface ISingletonProcessor.

A MessageProcessor marked with ISingletonProcessor will only run on one instance regardless of how you scale and will only process messages one at a time.

public class SingletonCommandProcessor : IProcessCommand<SingletonCommand, Settings>, ISingletonProcessor
{
    public Task ProcessAsync(SingletonCommand message, CancellationToken cancellationToken)
    {
        //This code will never run concurrently
        return Task.CompletedTask;
    }
}

Since the SingletonLock is held globally a distributed locking mechanism must be supplied. By default KnightBus comes with an Azure implementation using Blob Storage leases.

To enable Singleton MessageProcessors simple supply the host with an ISingletonLockManager.

Hosting

The KnightBus Host can be hosted anywhere where you can run Console Applications. Currently all of our KnightBus Hosts are deployed using Kubernetes Pods, Azure WebJobs and TopShelf Windows Services.

Messages

Everything processed using KnightBus is a considered a message and implements the IMessage interface. Messages are transport dependent, and need to implement the proper interface for the transport. All message haves a 1:1 relationship with a specific queue or topic.

Commands

Commands are messages that have a single recipient and is commanding the recipient to perform a task.

All commands implement ICommand through the specific transport implementation, for instance to send a command using the Azure Service Bus, you must implement IServiceBusCommand

Events

Events are messages that have a single dispatcher and multiple recievers and is telling the recievers that something has happened.

All events implement IEvent through the specific transport implementation, for instance to publish an event using the Azure Service Bus, you must implement IServiceBusEvent

Message Mappings

All Messages must also have a corresponding MessageMapping. KnightBus uses this to determine where to send and recieve the message. You implement this simply by adding your own implementation of IMessageMapping<MessageToMap>. When sending receiving messages KnightBus automatically finds these mappings.

public class MyMessage : IServiceBusCommand
{
    public string Message { get; set; }
}

public class MyMessageMapping : IMessageMapping<MyMessage>
{
    public string QueueName => "some-queue-name";
}

The mapping class must reside in the same assembly as the message that is’s mapping.

Sending Messages

To send a message you need to use the client for the specific message transport.

var serviceBusClient = new ServiceBus(new ServiceBusConfiguration("connectionString"));
var storageBusClient = new StorageBus(new StorageBusConfiguration("connectionString"));

await serviceBusClient.SendAsync(new MyServiceBusMessage());
await storageBusClient.SendAsync(new MyStorageBusMessage());

Custom Message Serialization

By default KnightBus uses Microsoft.Text.Json for message serialization. To override you can either change it per transport or for a specific message. To override for a specific Message set it on the message mapping.

public class CommandMapping : IMessageMapping<SomeCommand>, ICustomMessageSerializer
{
    public string QueueName => "testcommand";
    public IMessageSerializer MessageSerializer => new ProtobufNetSerializer();
}

Message Attachments

Regardless of the transport you can attach large files as attachments. Simply implement the ICommandWithAttachment interface on your command. The default implementation is for Azure Blob Storage using the BlobStorageMessageAttachmentProvider class which needs to be configured on the ITransportConfiguration supplied to the client and the host.

You can write your own implementation by implementing the IMessageAttachmentProvider interface. IMessageAttachmentProvider is separated by transport so you can use different attachment providers at the same time.

public class MyMessage : IServiceBusCommand, ICommandWithAttachment
{
    public string Message { get; set; }
    public IMessageAttachment Attachment { get; set; } //Here you can access the attached file
}

Using Azure ServiceBus Creation Options Overrides For Queue/Topic

To tell the Azure ServiceBus queue/topic to override default creation options, add IServiceBusCreationOptions to IMessageMapping implementation.

public class MyMessage : IServiceBusCommand
{
    public string Message { get; set; }
}

public class MyMessageMapping : IMessageMapping<MyMessage>, IServiceBusCreationOptions
{
    public string QueueName => "your-queue";

    public bool EnablePartitioning => true;
    public bool SupportOrdering => false;
    public bool EnableBatchedOperations => true;
}

Transports

Transports determine how your message is transported from Client to Processor. All current transports are listed here, but it’s also fairly easy to extend KnightBus with other transports. A central concept is that each message is tagged with the transport it should use, for instance if you want to send a Command over Azure ServiceBus, implement the interface IServiceBusCommand.

Transport Configuration

TODO

Azure Storage Bus

  • commands
  • attachments
  • saga store
  • singleton locks

https://azure.microsoft.com/en-us/services/storage/queues/

Redis

  • commands
  • events
  • attachments
  • saga store

The Redis transport supports both commands and events and is a very high performance transport.

KnightBus implements the Circular list pattern where all messages sent are stored on a list and when processed they are moved to another list making sure that no messages are lost in transit.

Pub/Sub is enabled by that the clients will find out what subscribers exists and publish to specific queue. An event that has three listeners will be published to three separed queues.

Sagas

Sagas add state to your messagaging and allows you to handle long running processes within the same code. A Saga is a long lived transaction that is started by one or more specific messages and is finished by one or more messages. The messages can be either events or commands.

Setup

Setup KnightBus for Sagas by registering it in the host:

var knightBusHost = new KnightBusHost()
//Enable the StorageBus Transport
.UseTransport(new StorageTransport(storageConnection)
.Configure(configuration => configuration
    //Register our message processors without IoC using the standard provider
    .UseMessageProcessorProvider(new StandardMessageProcessorProvider()
        .RegisterProcessor(new SampleSagaMessageProcessor(client))
    )
    //Enable Saga support using the table storage Saga store
    .EnableSagas(new StorageTableSagaStore(storageConnection))
);

await knightBusHost.StartAsync();

Sample Implementation

class SampleSagaMessageProcessor: Saga<MySagaData>,
        IProcessCommand<SampleSagaMessage, SomeProcessingSetting>,
        IProcessCommand<SampleSagaStartMessage, SomeProcessingSetting>,
        ISagaDuplicateDetected<SampleSagaMessage>
    {
        private readonly IStorageBus _storageBus;

        public SampleSagaMessageProcessor(IStorageBus storageBus)
        {
            _storageBus = storageBus;
            //Map the messages to the Saga
            MessageMapper.MapStartMessage<SampleSagaStartMessage>(m=> m.Id);
            MessageMapper.MapMessage<SampleSagaMessage>(m=> m.Id);
        }
        public override string PartitionKey => "sample-saga";
        public async Task ProcessAsync(SampleSagaStartMessage message, CancellationToken cancellationToken)
        {
            Console.WriteLine(message.Message);
            await _storageBus.SendAsync(new SampleSagaMessage());
        }

        public async Task ProcessAsync(SampleSagaMessage message, CancellationToken cancellationToken)
        {
            Console.WriteLine("Counter is {0}", Data.Counter);
            if (Data.Counter == 5)
            {
                Console.WriteLine("Finishing Saga");
                await CompleteAsync();
                return;
            }

            Data.Counter++;
            await UpdateAsync();
            await _storageBus.SendAsync(new SampleSagaMessage());
        }

        public Task ProcessDuplicateAsync(SampleSagaMessage message, CancellationToken cancellationToken)
        {
            // Saga duplicate found

            return Task.CompletedTask;
        }
    }

    // This is exposed as `Data` property to classes that implements Saga<MySagaData>
    class MySagaData
    {
        public int Counter { get; set; }
    }

    class SomeProcessingSetting : IProcessingSettings
    {
        public int MaxConcurrentCalls => 1;
        public int PrefetchCount => 1;
        public TimeSpan MessageLockTimeout => TimeSpan.FromMinutes(5);
        public int DeadLetterDeliveryLimit => 2;
    }

Versioning

KnightBus uses Semantic Versioning to make sure that it’s easy for developers to understand the impact of new versions.

Read more about it here: Semantic Versioning

Monitoring

Using the middleware pattern KnightBus can monitor message processing with any tool you’d like.

Already available monitoring middlewares

  • New Relic
  • Application Insights

New Relic

Install the package KnightBus.NewRelic and configure KnightBusHost to use New Relic.

var knightBus = new KnightBusHost()
    .UseTransport(...)
    .Configure(configuration => configuration
        .UseNewRelic()
        ...
    );

Liveness

TcpAliveListenerPlugin offers monitoring for liveness using TCP that can be used for services that don’t serve http.

var knightBus = new KnightBusHost()
    .UseTransport(...)
    .Configure(configuration => configuration
        .UseTcpAliveListener(port: 13000)
        ...
    );

Example using Kubernetes:

livenessProbe:
 tcpSocket:
  port: {{  .Values.ports.liveness  }}
 initialDelaySeconds: 10
 periodSeconds: 10
 timeoutSeconds: 3
 successThreshold: 1
 failureThreshold: 5

See https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-tcp-liveness-probe for more information on how to configure liveness with tcp.

Licence

KnightBus is licenced under the MIT licence.

Copyright (c) 2018 BookBeat

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Credits

Main Author

Niklas Arbin, Systems Architect @ BookBeat

Contributors

Unordered list of persons contributing their time and intellect to KnightBus

  • Tobias Johansson
  • Albin Carnstam
  • Viktor Hartenberger
  • Magnus Baneryd
  • Olov Siktröm
  • Simon Aunér
  • André Virdarson
  • Tobias Balzano
  • Peter Bergman