A simple message bus

Spread the love

The message bus is a typical pattern to allow loosely coupled software components to communicate, usually in an event-driven manner. Don’t let the enterprise-y description deter you — a simple message bus for a single process scenario can be quite easy to build. Let’s look at two different implementations in C# and C++.

First, some simplifying assumptions:

These sound like terrible limitations, but remember that we are looking at a single process scenario. In such cases, it is not only fine but often expected to wire up all subscriptions on application startup (prior to all sends) and to drop/forget any messages in flight on shutdown.

Let’s start with the C# implementation. First, as always, the unit tests:

    using System;
    using System.Collections.Generic;
    using FluentAssertions;
    using Microsoft.VisualStudio.TestTools.UnitTesting;

    [TestClass]
    public class MessageBusTest
    {
        [TestMethod]
        public void SendZeroSubscribers()
        {
            MessageBus bus = new MessageBus();

            Action act = () => bus.Send(new MyMessage("hello"));

            act.Should().NotThrow();
        }

        [TestMethod]
        public void SendOneSubscriber()
        {
            MessageBus bus = new MessageBus();
            List<string> received = new List<string>();
            Action<MyMessage> subscriber = m => received.Add(m.Text);

            bus.Subscribe(subscriber);
            bus.Send(new MyMessage("hello"));

            received.Should().ContainSingle().Which.Should().Be("hello");
        }

        [TestMethod]
        public void SendTwoSubscribers()
        {
            MessageBus bus = new MessageBus();
            List<string> received = new List<string>();
            Action<MyMessage> subscriber1 = m => received.Add(m.Text + "1");
            Action<MyMessage> subscriber2 = m => received.Add(m.Text + "2");

            bus.Subscribe(subscriber1);
            bus.Subscribe(subscriber2);
            bus.Send(new MyMessage("hello"));

            received.Should().HaveCount(2).And.ContainInOrder("hello1", "hello2");
        }

        [TestMethod]
        public void SendTwoOneSubscriberEach()
        {
            MessageBus bus = new MessageBus();
            List<string> received = new List<string>();
            Action<MyMessage> subscriber1 = m => received.Add(m.Text + "1");
            Action<MyOtherMessage> subscriber2 = m => received.Add(m.Text + "2");

            bus.Subscribe(subscriber1);
            bus.Subscribe(subscriber2);
            bus.Send(new MyMessage("one-hello"));
            bus.Send(new MyOtherMessage("two-hello"));

            received.Should().HaveCount(2).And.ContainInOrder("one-hello1", "two-hello2");
        }

        [TestMethod]
        public void SendTwoSimpleTypesOneSubscriberEach()
        {
            MessageBus bus = new MessageBus();
            List<string> received = new List<string>();
            Action<string> subscriber1 = m => received.Add("S=" + m);
            Action<int> subscriber2 = m => received.Add("N=" + m);

            bus.Subscribe(subscriber1);
            bus.Subscribe(subscriber2);
            bus.Send("xyz");
            bus.Send(123);

            received.Should().HaveCount(2).And.ContainInOrder("S=xyz", "N=123");
        }

        [TestMethod]
        public void SendTwoInstancesThreeSubscribersEachBeforeAndAfter()
        {
            MessageBus bus1 = new MessageBus();
            MessageBus bus2 = new MessageBus();
            List<string> received = new List<string>();
            Action<string> subscriber1 = m => received.Add("S1=" + m);
            Action<string> subscriber2 = m => received.Add("S2=" + m);
            Action<string> subscriber3 = m => received.Add("S3=" + m);
            Action<string> subscriber4 = m => received.Add("S4=" + m);
            Action<int> subscriber5 = m => received.Add("S5=" + m);
            Action<int> subscriber6 = m => received.Add("S6=" + m);

            bus1.Subscribe(subscriber1);
            bus2.Subscribe(subscriber2);
            bus1.Send("aaa");
            bus2.Send("bbb");
            bus1.Subscribe(subscriber3);
            bus2.Subscribe(subscriber4);
            bus1.Send("ccc");
            bus2.Send("ddd");
            bus1.Subscribe(subscriber5);
            bus2.Subscribe(subscriber6);
            bus1.Send(1);
            bus2.Send(2);

            received.Should().HaveCount(8).And.ContainInOrder(
                "S1=aaa", "S2=bbb", "S1=ccc", "S3=ccc", "S2=ddd", "S4=ddd", "S5=1", "S6=2");
        }

        private class MyMessage
        {
            public MyMessage(string text)
            {
                this.Text = text;
            }

            public string Text { get; }
        }

        private class MyOtherMessage : MyMessage
        {
            public MyOtherMessage(string text)
                : base(text)
            {
            }
        }
    }

Note that the subscribers here are simple strongly-typed generic delegates and that the messages can be any type. The following implementation satisfies these tests:

    using System;
    using System.Collections.Generic;

    public sealed class MessageBus
    {
        private readonly Dictionary<Type, Action<object>> subscribers;

        public MessageBus()
        {
            this.subscribers = new Dictionary<Type, Action<object>>();
        }

        public void Subscribe<TMessage>(Action<TMessage> subscriber)
        {
            Type key = typeof(TMessage);
            if (!this.subscribers.ContainsKey(key))
            {
                this.subscribers.Add(key, null);
            }

            this.subscribers[key] += m => subscriber((TMessage)m);
        }

        public void Send<TMessage>(TMessage message)
        {
            if (this.subscribers.TryGetValue(typeof(TMessage), out Action<object> action))
            {
                action(message);
            }
        }
    }

The code has the luxury of being very concise here in part due to the hidden bonus feature of .NET delegates — they all support multicast by default. We are also using the generic type parameter to our advantage, as the basis of the lookup key. The only slightly clunky part is that (TMessage) cast but I can’t think of a way around it.

Let’s move on to C++. We don’t have reflection here, so an equivalent solution will have to either use RTTI or some fancy compile-time template trick. I’ve opted for the latter, which I’ll discuss in a minute. First, we need to port the unit tests to their equivalents using the Microsoft Native C++ Unit Test Framework:

// MessageBusTest.cpp

#include "CppUnitTest.h"
#include "MessageBus.h"

#include <sstream>
#include <vector>

using namespace Microsoft::VisualStudio::CppUnitTestFramework;
using namespace std;

namespace Test
{
    TEST_CLASS(MessageBusTest)
    {
    public:
        TEST_METHOD(SendZeroSubscribers)
        {
            bool threwException = false;
            MessageBus bus;

            try
            {
                bus.Send(MyMessage("hello"));
            }
            catch (...)
            {
                threwException = true;
            }

            Assert::IsFalse(threwException);
        }

        TEST_METHOD(SendOneSubscriber)
        {
            MessageBus bus;
            vector<string> received;
            auto subscriber = [&received](MyMessage& m) { received.push_back(m.get_Text()); };

            bus.Subscribe<MyMessage>(subscriber);
            bus.Send(MyMessage("hello"));

            Assert::AreEqual(size_t(1), received.size());
            Assert::AreEqual("hello", received[0].c_str());
        }

        TEST_METHOD(SendTwoSubscribers)
        {
            MessageBus bus;
            vector<string> received;
            auto subscriber1 = [&received](MyMessage& m) { received.push_back(m.get_Text() + "1"); };
            auto subscriber2 = [&received](MyMessage& m) { received.push_back(m.get_Text() + "2"); };

            bus.Subscribe<MyMessage>(subscriber1);
            bus.Subscribe<MyMessage>(subscriber2);
            bus.Send(MyMessage("hello"));

            Assert::AreEqual(size_t(2), received.size());
            Assert::AreEqual("hello1", received[0].c_str());
            Assert::AreEqual("hello2", received[1].c_str());
        }

        TEST_METHOD(SendTwoOneSubscriberEach)
        {
            MessageBus bus;
            vector<string> received;
            auto subscriber1 = [&received](MyMessage& m) { received.push_back(m.get_Text() + "1"); };
            auto subscriber2 = [&received](MyOtherMessage& m) { received.push_back(m.get_Text() + "2"); };

            bus.Subscribe<MyMessage>(subscriber1);
            bus.Subscribe<MyOtherMessage>(subscriber2);
            bus.Send(MyMessage("one-hello"));
            bus.Send(MyOtherMessage("two-hello"));

            Assert::AreEqual(size_t(2), received.size());
            Assert::AreEqual("one-hello1", received[0].c_str());
            Assert::AreEqual("two-hello2", received[1].c_str());
        }

        TEST_METHOD(SendTwoSimpleTypesOneSubscriberEach)
        {
            MessageBus bus;
            vector<string> received;
            auto subscriber1 = [&received](string& m) { received.push_back(string("S=") + m); };
            auto subscriber2 = [&received](int& m)
            {
                stringstream ss;
                ss << "N=" << m;
                received.push_back(ss.str());
            };

            bus.Subscribe<string>(subscriber1);
            bus.Subscribe<int>(subscriber2);
            bus.Send(string("xyz"));
            int msg = 123;
            bus.Send(msg);

            Assert::AreEqual(size_t(2), received.size());
            Assert::AreEqual("S=xyz", received[0].c_str());
            Assert::AreEqual("N=123", received[1].c_str());
        }

        TEST_METHOD(SendTwoInstancesThreeSubscribersEachBeforeAndAfter)
        {
            MessageBus bus1;
            MessageBus bus2;
            vector<string> received;
            auto subscriber1 = [&received](string& m) { received.push_back(string("S1=") + m); };
            auto subscriber2 = [&received](string& m) { received.push_back(string("S2=") + m); };
            auto subscriber3 = [&received](string& m) { received.push_back(string("S3=") + m); };
            auto subscriber4 = [&received](string& m) { received.push_back(string("S4=") + m); };
            auto subscriber5 = [&received](int& m)
            {
                stringstream ss;
                ss << "S5=" << m;
                received.push_back(ss.str());
            };
            auto subscriber6 = [&received](int& m)
            {
                stringstream ss;
                ss << "S6=" << m;
                received.push_back(ss.str());
            };

            bus1.Subscribe<string>(subscriber1);
            bus2.Subscribe<string>(subscriber2);
            bus1.Send(string("aaa"));
            bus2.Send(string("bbb"));
            bus1.Subscribe<string>(subscriber3);
            bus2.Subscribe<string>(subscriber4);
            bus1.Send(string("ccc"));
            bus2.Send(string("ddd"));
            bus1.Subscribe<int>(subscriber5);
            bus2.Subscribe<int>(subscriber6);
            int msg = 1;
            bus1.Send(msg);
            msg = 2;
            bus2.Send(msg);

            Assert::AreEqual(size_t(8), received.size());
            Assert::AreEqual("S1=aaa", received[0].c_str());
            Assert::AreEqual("S2=bbb", received[1].c_str());
            Assert::AreEqual("S1=ccc", received[2].c_str());
            Assert::AreEqual("S3=ccc", received[3].c_str());
            Assert::AreEqual("S2=ddd", received[4].c_str());
            Assert::AreEqual("S4=ddd", received[5].c_str());
            Assert::AreEqual("S5=1", received[6].c_str());
            Assert::AreEqual("S6=2", received[7].c_str());
        }

        class MyMessage
        {
        public:
            MyMessage(const string& text)
                : text_(text)
            {
            }

            const string& get_Text() const
            {
                return text_;
            }

        private:
            string text_;
        };

        class MyOtherMessage : public MyMessage
        {
        public:
            MyOtherMessage(const string& text)
                : MyMessage(text)
            {
            }
        };
    };
}

The only noticeable differences here are the more verbose assertions (maybe I’ll try out Chamois later) and the inability to use template argument deduction for the Subscribe calls (probably because I’m using auto with lambdas instead of directly assigning std::function instances). Now for the code:

// MessageBus.h

#pragma once

#include <functional>
#include <map>
#include <vector>

class MessageBus
{
private:
    typedef std::function<void(void*)> Func;
    typedef std::vector<Func> FuncList;
    typedef void* TypeId;
    typedef std::map<TypeId, FuncList> Map;

public:
    MessageBus()
        : subscribers_()
    {
    }

    template<typename TMessage>
    void Subscribe(std::function<void(TMessage&)> subscriber)
    {
        TypeId key = Id<TMessage>();
        FuncList& list = subscribers_[key];
        Func f = [subscriber](void* message) { subscriber(*static_cast<TMessage*>(message)); };
        list.push_back(f);
    }

    template<typename TMessage>
    void Send(TMessage& message) const
    {
        TypeId key = Id<TMessage>();
        auto it = subscribers_.find(key);
        if (it != subscribers_.cend())
        {
            const FuncList& list = it->second;
            for (auto f : list)
            {
                f(&message);
            }
        }
    }

private:
    MessageBus(const MessageBus&) = delete;
    MessageBus& operator=(const MessageBus&) = delete;

    template <typename T>
    static TypeId Id()
    {
        static T* id = nullptr;
        return &id;
    }

    Map subscribers_;
};

Since C++ doesn’t have multicast delegates, we are using a std::vector of std::functions. You can also see the template trick I alluded to above to get a consistent ID for each message type, adapted from araud‘s StackOverflow answer. Otherwise, this is fairly close in spirit to the C# version, including the “System.Object” analog via the lambda with void* pointer cast.

So, obviously, the C++ implementation is going to be faster, due to the lack of reflection, right? Let’s write a simple benchmark to find out:

#include <chrono>
#include <iostream>

#include "MessageBus.h"

using namespace std;

void Benchmark(int iterations)
{
    size_t count = 0;
    auto subscriber = [&count](const wstring& m) { count += m.length(); };
    MessageBus bus;

    bus.Subscribe<wstring>(subscriber);

    auto func = [&bus](int n)
    {
        for (int i = 0; i < n; ++i)
        {
            for (wchar_t c = L'A'; c <= L'Z'; ++c)
            {
                bus.Send(wstring(1, c));
            }
        }
    };

    chrono::high_resolution_clock clock;
    auto start = clock.now();

    func(iterations);

    auto end = clock.now();
    auto duration = end - start;
    double nsecPerOp = static_cast<double>(duration.count()) / iterations;

    wcout << L"Average operation time: " << nsecPerOp << L" ns (var count = " << count << L")" << endl;
}

int main()
{
    Benchmark(1);
    Benchmark(16);
    Benchmark(256);
    Benchmark(4096);
    Benchmark(65536);
    Benchmark(262144);
    Benchmark(524288);
    Benchmark(524288);
    Benchmark(524288);
    Benchmark(524288);
    Benchmark(524288);

    return 0;
}

Here is the output on my system:

Average operation time: 2171 ns (var count = 26)
Average operation time: 1880.13 ns (var count = 416)
Average operation time: 1848.61 ns (var count = 6656)
Average operation time: 1884.88 ns (var count = 106496)
Average operation time: 1938 ns (var count = 1703936)
Average operation time: 1867.31 ns (var count = 6815744)
Average operation time: 1871.75 ns (var count = 13631488)
Average operation time: 1863.55 ns (var count = 13631488)
Average operation time: 1851.69 ns (var count = 13631488)
Average operation time: 1854.21 ns (var count = 13631488)
Average operation time: 1854.9 ns (var count = 13631488)

Let’s call this 1860 ns per operation. Now for the same benchmark, translated into C#:

    using System;
    using System.Diagnostics;

    internal static class Program
    {
        private static void Benchmark(int iterations)
        {
            ulong count = 0;
            Action<string> subscriber = m => count += (uint)m.Length;
            MessageBus bus = new MessageBus();

            bus.Subscribe(subscriber);

            Action<int> func = n =>
            {
                for (int i = 0; i < n; ++i)
                {
                    for (char c = 'A'; c <= 'Z'; ++c)
                    {
                        string msg = new string(c, 1);
                        bus.Send(msg);
                    }
                }
            };

            Stopwatch clock = Stopwatch.StartNew();

            func(iterations);

            TimeSpan duration = clock.Elapsed;
            double nsecPerOp = (duration.Ticks * 100.0) / iterations;

            Console.WriteLine($"Average operation time: {nsecPerOp} ns (var count = {count})");
        }

        private static void Main(string[] args)
        {
            Benchmark(1);
            Benchmark(16);
            Benchmark(256);
            Benchmark(4096);
            Benchmark(65536);
            Benchmark(262144);
            Benchmark(524288);
            Benchmark(524288);
            Benchmark(524288);
            Benchmark(524288);
            Benchmark(524288);
        }
    }

The output on my system:

Average operation time: 1030200 ns (var count = 26)
Average operation time: 1550 ns (var count = 416)
Average operation time: 1550.390625 ns (var count = 6656)
Average operation time: 1625.09765625 ns (var count = 106496)
Average operation time: 1131.09283447266 ns (var count = 1703936)
Average operation time: 1075.38948059082 ns (var count = 6815744)
Average operation time: 1042.32959747314 ns (var count = 13631488)
Average operation time: 1047.92461395264 ns (var count = 13631488)
Average operation time: 1191.17279052734 ns (var count = 13631488)
Average operation time: 1095.16143798828 ns (var count = 13631488)
Average operation time: 1215.71636199951 ns (var count = 13631488)

Aside from some rather excessive warmup time, the results here are quite a bit faster at ~1100 ns per operation. How can this be?!

It turns out I have a small mistake in my C++ implementation. Refer back to the Send method:

    template<typename TMessage>
    void Send(const TMessage& message) const
    {
        TypeId key = Id<TMessage>();
        auto it = subscribers_.find(key);
        if (it != subscribers_.cend())
        {
            const FuncList& list = it->second;
            for (auto f : list)
            {
                f(&message);
            }
        }
    }

Do you see the issue? If not, know that auto drops reference qualifiers. I should have written auto& f; as it is, the function is copied to a new local variable instead of being invoked from the reference. Making that change and rerunning the benchmark gives the following output:

Average operation time: 310 ns (var count = 26)
Average operation time: 174.438 ns (var count = 416)
Average operation time: 167.176 ns (var count = 6656)
Average operation time: 156.878 ns (var count = 106496)
Average operation time: 154.034 ns (var count = 1703936)
Average operation time: 151.468 ns (var count = 6815744)
Average operation time: 154.744 ns (var count = 13631488)
Average operation time: 151.567 ns (var count = 13631488)
Average operation time: 158.178 ns (var count = 13631488)
Average operation time: 150.577 ns (var count = 13631488)
Average operation time: 150.845 ns (var count = 13631488)

There we go — better than 10X performance improvement.

Now, if only C# had a workable compile-time type information system. (Unfortunately, nameof doesn’t cut it.)

One thought on “A simple message bus

  1. Pingback: Building an adventure game: part 1 – WriteAsync .NET

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.