The .NET 4.0+ solution to the producer-consumer problem is BlockingCollection
. A sample app with a single producer and consumer using an ordered queue of integers would look something like this:
private static async Task EnqueueLoopAsync(BlockingCollection<int> queue, CancellationToken token) { int i = 0; while (!token.IsCancellationRequested) { ++i; queue.Add(i); await Task.Delay(1); } } private static void DequeueLoop(BlockingCollection<int> queue, CancellationToken token) { int previous = 0; try { while (!token.IsCancellationRequested) { int current = queue.Take(token); if (current - previous != 1) { throw GetOutOfOrderError(current, previous); } previous = current; } } catch (OperationCanceledException) { } }
This is simple enough, but the problem is that DequeueLoop
. It spends a lot of time (relatively speaking) blocking the thread due to the synchronous Take
method. About the best you can do, async-wise, is use TryTake
with a small timeout and yield a bit on empty, for example:
private static async Task DequeueLoopAsync(BlockingCollection<int> queue, CancellationToken token) { int previous = 0; try { while (!token.IsCancellationRequested) { int current; if (queue.TryTake(out current, 1)) { if (current - previous != 1) { throw GetOutOfOrderError(current, previous); } previous = current; } else { await Task.Delay(1); } } } catch (OperationCanceledException) { } }
Certainly not ideal, but what did we expect from a BlockingCollection
anyway?
For true async support, we’ll need to come up with a different data structure. And this is exactly what I did in my new QueueSample project.
The main class of interest is InputQueue
. (If you’ve seen my earlier blog posts, you might note some similarities to MemoryChannel
.) Its interface is as follows:
public class InputQueue<T> : IInputQueue<T> { public InputQueue() public Task<T> DequeueAsync(); public void Enqueue(T item); public void Dispose(); }
Some implementation notes:
- For simplicity,
InputQueue
allows many producers but only a single consumer at a time. - Again for simplicity, the only way to cancel a pending
DequeueAsync
operation is toDispose()
the queue. Enqueue
can complete a pendingDequeueAsync
operation which means that it could run synchronous continuations on the calling thread. Normally this isn’t a problem except in cases where you have a long chain of tasks that all complete synchronously. See this StackOverflow question and Stephen Toub’s blog post for more information.
With this true async dequeue method at our disposal, we can rewrite our consumer like so:
private static async Task DequeueLoopAsync(IInputQueue<int> queue, CancellationToken token) { int previous = 0; try { while (!token.IsCancellationRequested) { int current = await queue.DequeueAsync(); if (current - previous != 1) { throw GetOutOfOrderError(current, previous); } previous = current; } } catch (ObjectDisposedException) { } }
In a later post, I’ll talk about more ways we can use this non-blocking collection.
Pingback: UberQueue challenge – WriteAsync .NET