UberQueue challenge

Spread the love

David Fowler tweeted:

Coding challenge for you in any language. I want to see different ways of expressing this computation:

class UberQueue<T>
{
    private IAsyncQueue<T>[] _queues;

    public UberQueue(IAsyncQueue<T>[] queues)
    {
        _queues = queues;
    }

    Task<T> DequeueAsync()
    {
        // You're given an array of async queues and you need to
        // return the next item that's ready from any of those queues
    }
}

interface IAsyncQueue<T>
{
    Task<T> DequeueAsync();
}

The challenge was, of course, expressed in C# and what luck — that was my chosen implementation language!

Before I wrote any code, I mentally sketched out the solution that first came to me; I would concurrently read from all the queues and place the items as they arrive in an “output” queue. Then the dequeue operation is just a matter of reading from the output.

Long time readers (all three of you) may recognize this general async queue pattern from my post about InputQueue over seven years ago — back in the .NET 4.5 days. However, I chose not to revisit that code when doing this challenge. Instead I started from a blank slate and TDD’d my way to a solution meeting the stated minimal requirements.

First, I needed an AsyncQueue. The theory here is the same as with InputQueue. We want to allow one async reader and many synchronous writers to this queue. If we try to dequeue and there are items ready, we return the next one; if there are no items, we return a pending task. The next enqueue should fulfill the pending task, if any; otherwise it should put the item in the queue. No sweat! AsyncQueue.cs implements these behaviors against the tests specified in AsyncQueueTest.cs:

  • EmptyPending
  • EnqueueThenDequeue
  • DequeueThenEnqueue
  • EnqueueTwoThenDequeueTwo
  • EmptyPendingDequeueAgain
  • DequeueThenEnqueueTwice

Unit tests cannot easily cover multithreaded behavior, so I also wrote an integration app to demonstrate many concurrent enqueue threads with a single async dequeue loop. To verify nothing went wrong, I rigged it so that each enqueue loop wrote an encoded hex value where the lower bits contained the thread number and the upper bits contained a monotonically increasing value. This ensured that the dequeue operation would always see a fixed increment for each successive value per thread number (with no guarantees of which thread’s value it would observe next, of course). The initial app logic is here in Program.cs. Before you ask — yes, of course, I did test this with and without the locks. (As expected, the app crashes very quickly with corrupted results in the latter case.)

With the core data structure out of the way, I could now write UberQueue. Once again, I TDD’d the requirements and ended up with UberQueue.cs and UberQueueTest.cs:

  • ZeroQueues
  • OneQueueDequeueEnqueue
  • TwoQueuesDequeueEnqueueTwiceAlternating
  • ThreeQueuesDequeueEnqueueSixTimesAlternating

Finally, I made some small changes to the integration app, using UberQueue with a small set of inner queues distributed among the threads: Program.cs.

The results seem correct, but there are obviously many situations that are not accounted for. The queue interface as given does not support cancellation, so the reader loops will (potentially) never end. Which exceptions can be thrown from dequeue, if any? It’s not mentioned, so none are handled in my solution. Should I be using System.Threading.Channels instead of inventing my own concurrent data structures? Probably, but what’s the fun in that?!

In any case, it was a nice weekend coding diversion. I’m sure we’ll see many other interesting solutions across the spectrum of frameworks and languages beyond .NET.

Leave a Reply

Your email address will not be published. Required fields are marked *