A simple message bus: Java edition

Spread the love

In the previous post, we looked at competing message bus implementations in C# and C++. How about we give Java a try now?

Converting to Java syntax and style conventions, the SendOneSubscriber test should look like this:

package com.example;

import java.util.function.*;
import java.util.*;
import org.junit.Test;

import static org.hamcrest.Matchers.*;
import static org.hamcrest.MatcherAssert.assertThat;

public class MessageBusTest {
    // . . .

    @Test
    public void testSendOneSubscriber() {
        MessageBus bus = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<MyMessage> subscriber = m -> received.add(m.getText());

        bus.subscribe(subscriber);
        bus.send(new MyMessage("hello"));

        assertThat(received, contains("hello"));
    }

    // . . .
}

Alas, there is a big problem here. Let’s first review what the API for MessageBus should look like given the test above:

package com.example;

import java.util.function.*;

public class MessageBus {
    public MessageBus() {
        /* . . . */
    }

    public <M> void subscribe(Consumer<M> subscriber) {
        /* . . . */
    }

    public <M> void send(M message) {
        /* . . . */
    }
}

Ideally, inside those generic methods we would retrieve the type information about M and use this as a lookup key for a HashMap. Unfortunately, the implementation of Java generics is based on type erasure. By definition, we have no runtime information available to us about what type M actually was in the source code. Techniques such as TypeLiteral in various Java libraries exist to solve related type erasure problems but cannot seem to help us in this instance.

As yet, I have not found a way to make this type of generically typed API work as is. We will simply have to alter the API and force the caller to pass a type token as an additional parameter:

package com.example;

import java.util.function.*;

public class MessageBus {
    public MessageBus() {
        /* . . . */
    }

    public <M> void subscribe(Consumer<M> subscriber, Class<M> type) {
        /* . . . */
    }

    public <M> void send(M message, Class<M> type) {
        /* . . . */
    }
}

The test would change as follows:

    /* . . . */

    @Test
    public void testSendOneSubscriber() {
        MessageBus bus = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<MyMessage> subscriber = m -> received.add(m.getText());

        bus.subscribe(subscriber, MyMessage.class);
        bus.send(new MyMessage("hello"), MyMessage.class);

        assertThat(received, contains("hello"));
    }

    /* . . . */

It’s a bit ugly but should be type safe. For example, it is not possible to compile this code:

        Consumer<String> subscriber = m -> /* . . . */;

        // ERROR: The method subscribe(M, Class<M>) in the type MessageBus is not
        // applicable for the arguments (Consumer<String>, Class<Integer>)
        bus.subscribe(subscriber, Integer.class);

Here is the entire test suite translated to Java, using the convenient Hamcrest matcher objects for fluent syntax:

package com.example;

import java.util.function.*;
import java.util.*;
import org.junit.Test;

import static org.hamcrest.Matchers.*;
import static org.hamcrest.MatcherAssert.assertThat;

public class MessageBusTest {
    @Test
    public void testSendZeroSubscribers() {
        MessageBus bus = new MessageBus();

        boolean threwException = false;
        try {
            bus.send(new MyMessage("hello"), MyMessage.class);
        } catch (Exception e) {
            threwException = true;
        }

        assertThat(threwException, is(false));
    }

    @Test
    public void testSendOneSubscriber() {
        MessageBus bus = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<MyMessage> subscriber = m -> received.add(m.getText());

        bus.subscribe(subscriber, MyMessage.class);
        bus.send(new MyMessage("hello"), MyMessage.class);

        assertThat(received, contains("hello"));
    }

    @Test
    public void testSendTwoSubscribers() {
        MessageBus bus = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<MyMessage> subscriber1 = m -> received.add(m.getText() + "1");
        Consumer<MyMessage> subscriber2 = m -> received.add(m.getText() + "2");

        bus.subscribe(subscriber1, MyMessage.class);
        bus.subscribe(subscriber2, MyMessage.class);
        bus.send(new MyMessage("hello"), MyMessage.class);

        assertThat(received, contains("hello1", "hello2"));
    }

    @Test
    public void testSendTwoOneSubscriberEach() {
        MessageBus bus = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<MyMessage> subscriber1 = m -> received.add(m.getText() + "1");
        Consumer<MyOtherMessage> subscriber2 = m -> received.add(m.getText() + "2");

        bus.subscribe(subscriber1, MyMessage.class);
        bus.subscribe(subscriber2, MyOtherMessage.class);
        bus.send(new MyMessage("one-hello"), MyMessage.class);
        bus.send(new MyOtherMessage("two-hello"), MyOtherMessage.class);

        assertThat(received, contains("one-hello1", "two-hello2"));
    }

    @Test
    public void testSendTwoSimpleTypesOneSubscriberEach() {
        MessageBus bus = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<String> subscriber1 = m -> received.add("S=" + m);
        Consumer<Integer> subscriber2 = m -> received.add("N=" + m);

        bus.subscribe(subscriber1, String.class);
        bus.subscribe(subscriber2, Integer.class);
        bus.send("xyz", String.class);
        bus.send(123, Integer.class);

        assertThat(received, contains("S=xyz", "N=123"));
    }

    @Test
    public void testSendTwoInstancesThreeSubscribersEachBeforeAndAfter() {
        MessageBus bus1 = new MessageBus();
        MessageBus bus2 = new MessageBus();
        ArrayList<String> received = new ArrayList<String>();
        Consumer<String> subscriber1 = m -> received.add("S1=" + m);
        Consumer<String> subscriber2 = m -> received.add("S2=" + m);
        Consumer<String> subscriber3 = m -> received.add("S3=" + m);
        Consumer<String> subscriber4 = m -> received.add("S4=" + m);
        Consumer<Integer> subscriber5 = m -> received.add("S5=" + m);
        Consumer<Integer> subscriber6 = m -> received.add("S6=" + m);

        bus1.subscribe(subscriber1, String.class);
        bus2.subscribe(subscriber2, String.class);
        bus1.send("aaa", String.class);
        bus2.send("bbb", String.class);
        bus1.subscribe(subscriber3, String.class);
        bus2.subscribe(subscriber4, String.class);
        bus1.send("ccc", String.class);
        bus2.send("ddd", String.class);
        bus1.subscribe(subscriber5, Integer.class);
        bus2.subscribe(subscriber6, Integer.class);
        bus1.send(1, Integer.class);
        bus2.send(2, Integer.class);

        assertThat(received, contains("S1=aaa", "S2=bbb", "S1=ccc", "S3=ccc", "S2=ddd", "S4=ddd", "S5=1", "S6=2"));
    }

    private class MyMessage {
        private final String text;

        public MyMessage(String text) {
            this.text = text;
        }

        public String getText() {
            return this.text;
        }
    }

    private class MyOtherMessage extends MyMessage {
        public MyOtherMessage(String text) {
            super(text);
        }
    }
}

These tests are somewhat closer to the C# implementation than the C++, in large part due to Hamcrest. There doesn’t seem to be a nice fluent way to assert for exceptions, though. FYI, since JUnit 4.12 includes an older version of Hamcrest, I updated my build.gradle file for the java-library plugin to include Hamcrest 2.0 like so:

dependencies {
    // . . .
    // Use JUnit test framework
    testImplementation 'junit:junit:4.12'
    
    // Use Hamcrest 2.0
    testImplementation('org.hamcrest:java-hamcrest:2.0.0.0')
}

Finally, let’s look at the implementation code:

package com.example;

import java.util.*;
import java.util.function.*;

public class MessageBus {
    private final HashMap<Class<?>, ArrayList<Consumer<Object>>> map;

    public MessageBus() {
        this.map = new HashMap<Class<?>, ArrayList<Consumer<Object>>>();
    }

    public <M> void subscribe(Consumer<M> subscriber, Class<M> type) {
        ArrayList<Consumer<Object>> list = this.map.get(type);
        if (list == null) {
            list = new ArrayList<Consumer<Object>>();
            this.map.put(type, list);
        }

        @SuppressWarnings("unchecked")
        Consumer<Object> f = m -> subscriber.accept((M) m);
        list.add(f);
    }

    public <M> void send(M message, Class<M> type) {
        ArrayList<Consumer<Object>> list = this.map.get(type);
        if (list != null) {
            for (Consumer<Object> f : list) {
                f.accept(message);
            }
        }
    }
}

Like C++, Java has no multicast delegates, so we’re back to the “list of functions” solution. Other than that, the code is roughly as concise as the C# version. Note the use of the unbounded wildcard type Class<?> which is roughly equivalent to System.Type in the C# solution, and the need to suppress the “unchecked” warning due to the Object to M cast (we know it is safe here, but due to type erasure, the runtime does not).

Of course, the exercise wouldn’t be complete without the ported benchmark. Here it is:

package com.example;

import java.util.function.*;

public class Program {
    public static void main(String[] args) {
        benchmark(1);
        benchmark(16);
        benchmark(256);
        benchmark(4096);
        benchmark(65536);
        benchmark(262144);
        benchmark(524288);
        benchmark(524288);
        benchmark(524288);
        benchmark(524288);
        benchmark(524288);
    }

    private static void benchmark(int iterations) {
        LongWrapper count = new LongWrapper();
        Consumer<String> subscriber = m -> count.increment(m.length());
        MessageBus bus = new MessageBus();

        bus.subscribe(subscriber, String.class);

        Consumer<Integer> func = n -> {
            for (int i = 0; i < n; ++i) {
                for (char c = 'A'; c <= 'Z'; ++c) {
                    String msg = new String(c + "");
                    bus.send(msg, String.class);
                }
            }
        };

        long start = System.nanoTime();

        func.accept(iterations);

        long end = System.nanoTime();
        double nsecPerOp = 1.0 * (end - start) / iterations;

        System.out.printf("Average operation time: %f ns (var count = %d)\n", nsecPerOp, count.get());
    }

    private static class LongWrapper {
        private long value;

        public LongWrapper() {
            this.value = 0L;
        }

        public void increment(long inc) {
            this.value += inc;
        }

        public long get() {
            return this.value;
        }
    }
}

Remember, Java doesn’t have unsigned types and local variables cannot be mutated within a lambda expression (necessitating the LongWrapper workaround that is otherwise done “for free” in the .NET case). The results:

Average operation time: 176769.000000 ns (var count = 26)
Average operation time: 62644.437500 ns (var count = 416)
Average operation time: 20396.523438 ns (var count = 6656)
Average operation time: 8717.085205 ns (var count = 106496)
Average operation time: 1592.115631 ns (var count = 1703936)
Average operation time: 1398.114811 ns (var count = 6815744)
Average operation time: 1432.140196 ns (var count = 13631488)
Average operation time: 852.286091 ns (var count = 13631488)
Average operation time: 816.319372 ns (var count = 13631488)
Average operation time: 835.554661 ns (var count = 13631488)
Average operation time: 869.618492 ns (var count = 13631488)

What we traded in API convenience seems to have paid back in runtime performance. The Java implementation which stabilizes at ~850 ns obviously won’t beat C++ but is apparently faster than the C# implementation by almost 25%.

Who says Java is slow?!

Leave a Reply

Your email address will not be published. Required fields are marked *