The producer-consumer problem is one of the greatest hits of computer science. The classic solution involves some sort of queue data structure and an event or two to notify the consumer(s). Let’s take a look at a simple implementation and do some performance analysis.
For this experiment, we will start by analyzing an implementation using a ConcurrentQueue, an AutoResetEvent for waking the consumer, and a ManualResetEvent to notify on shutdown. We shall call it WorkerQueue
. It will automatically start a fixed number of consumer threads on construction and cancel them all on Dispose
. A user-specified onReceive
callback will be invoked on every item processed.
Here is the code:
using System; using System.Collections.Concurrent; using System.Threading; public sealed class WorkerQueue<T> : IDisposable { private readonly ConcurrentQueue<T> queue; private readonly Action<int, T> onReceive; private readonly AutoResetEvent dataAvailable; private readonly ManualResetEvent canceled; private readonly Thread[] threads; public WorkerQueue(int threads, Action<int, T> onReceive) { this.queue = new ConcurrentQueue<T>(); this.onReceive = onReceive; this.dataAvailable = new AutoResetEvent(false); this.canceled = new ManualResetEvent(false); this.threads = new Thread[threads]; for (int i = 0; i < threads; ++i) { Thread thread = new Thread(Receive); thread.Start(i); this.threads[i] = thread; } } public void Add(T item) { this.queue.Enqueue(item); this.dataAvailable.Set(); } public void Dispose() { this.canceled.Set(); foreach (Thread thread in this.threads) { thread.Join(); } this.canceled.Dispose(); this.dataAvailable.Dispose(); } private void Receive(object objThread) { int thread = (int)objThread; WaitHandle[] waitHandles = new WaitHandle[] { this.canceled, this.dataAvailable }; bool done = false; do { int index = WaitHandle.WaitAny(waitHandles); done = index == 0; while (!done && this.queue.TryDequeue(out T data)) { done = this.canceled.WaitOne(0); this.onReceive(thread, data); } } while (!done); } }
Of course I would be remiss if I didn’t include unit tests, so here those are:
using System; using System.Collections.Generic; using System.Linq; using System.Threading; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] public sealed class WorkerQueueTest { [TestMethod] public void OneThread() { List<int> received = new List<int>(); using (WorkerQueue<int> queue = new WorkerQueue<int>(1, (t, i) => received.Add(i + t))) { for (int i = 0; i < 1000; ++i) { queue.Add(i); } while (received.Count == 0) { } } int count = received.Count; count.Should().BeGreaterThan(0); received.Should().ContainInOrder(Enumerable.Range(0, count)); } [TestMethod] public void TwoThreads() { int sum = 0; using (WorkerQueue<int> queue = new WorkerQueue<int>(2, (_, i) => Interlocked.Add(ref sum, i))) { for (int i = 1; i <= 1000; ++i) { queue.Add(i); } while (Volatile.Read(ref sum) == 0) { } } sum.Should().Be(500500); // 1 + 2 + ... + 999 + 1000 } [TestMethod] public void FourThreads() { int sum = 0; using (WorkerQueue<int> queue = new WorkerQueue<int>(4, (_, i) => Interlocked.Add(ref sum, i))) { for (int i = 1; i <= 1000; ++i) { queue.Add(i); } while (Volatile.Read(ref sum) == 0) { } } sum.Should().Be(500500); // 1 + 2 + ... + 999 + 1000 } [TestMethod] public void TwoThreadsOneBlocked() { List<int> received = new List<int>(); ManualResetEventSlim block = new ManualResetEventSlim(); Action<int, int> onReceive = (t, i) => { if (t == 0) { block.Wait(); } else { received.Add(i); } }; using (block) using (WorkerQueue<int> queue = new WorkerQueue<int>(2, onReceive)) { for (int i = 0; i <= 1000; ++i) { queue.Add(i); } while (received.Count != 1000) { } block.Set(); } received.Should().HaveCount(1000).And.ContainInOrder(Enumerable.Range(1, 1000)); } [TestMethod] public void TwoThreadsBothBlocked() { int received = 0; ManualResetEventSlim block = new ManualResetEventSlim(); Action<int, int> onReceive = (t, i) => { Interlocked.Add(ref received, i + t); block.Wait(); }; using (block) using (WorkerQueue<int> queue = new WorkerQueue<int>(2, onReceive)) { for (int i = 1; i <= 1000; ++i) { queue.Add(i); } while (Volatile.Read(ref received) != 4) { } block.Set(); } received.Should().Be(12); // (1 + 2 + 3 + 4) + (0 + 0 + 1 + 1) } }
Before we can do any measurements, we need a scenario and some infrastructure to drive it. For simplicity, we will assume a single-threaded sender and apply a fixed operation latency for every send and receive call. We will want to gather key performance metrics like private bytes, thread count, and of course the total number of sent and received messages. This Operations
class will do all that:
internal sealed class Operations { private Throttle sendThrottle; private Throttle[] receiveThrottles; private CancellationTokenSource cts; private Task sendTask; private Task reportTask; private long sent; private long received; public Operations(int receiveThreads, Rate sendRate, Rate receiveRate) { RealClock clock = new RealClock(); this.sendThrottle = new Throttle(sendRate, clock); this.receiveThrottles = new Throttle[receiveThreads]; for (int i = 0; i < receiveThreads; ++i) { this.receiveThrottles[i] = new Throttle(receiveRate, clock); } } public void Start(WorkerQueue<long> queue) { this.cts = new CancellationTokenSource(); this.reportTask = this.ReportAsync(this.cts.Token); this.sendTask = Task.Run(() => this.Send(queue, this.cts.Token)); } public void Stop() { using (this.cts) { this.cts.Cancel(); this.sendTask.Wait(); } } public void OnReceive(int thread) { this.receiveThrottles[thread].Wait(); Interlocked.Increment(ref this.received); } private void Send(WorkerQueue<long> queue, CancellationToken token) { while (!token.IsCancellationRequested) { queue.Add(Interlocked.Increment(ref this.sent)); this.sendThrottle.Wait(); } } private async Task ReportAsync(CancellationToken token) { try { Stopwatch stopwatch = Stopwatch.StartNew(); Console.WriteLine("Time,Sent,Received,Threads,CpuTime,PrivateBytes"); while (!token.IsCancellationRequested) { await Task.Delay(1000, token); ReportOne(stopwatch.Elapsed); } } catch (OperationCanceledException) { } } private void ReportOne(TimeSpan elapsed) { using (ProcessInfo info = ProcessInfo.Get()) { Console.WriteLine( "{0:000.000},{1:000000000},{2:000000000},{3:000},{4:000.000},{5:000000000}", elapsed.TotalSeconds, Volatile.Read(ref this.sent), Volatile.Read(ref this.received), info.Threads, info.CpuTime, info.PrivateBytes); } } }
Finally, we need a program entry point to execute the scenario:
using System; using System.Threading; internal static class Program { private static void Main(string[] args) { if (args.Length < 3) { Console.WriteLine("Please specify the thread count, send rate (per sec), and receive rate (per sec)."); Console.WriteLine("Optionally provide the total duration (sec) for non-interactive mode."); return; } int threads = int.Parse(args[0]); Rate sendRate = Rate.PerSecond(double.Parse(args[1])); Rate receiveRate = Rate.PerSecond(double.Parse(args[2])); TimeSpan duration = TimeSpan.Zero; if (args.Length > 3) { duration = TimeSpan.FromSeconds(int.Parse(args[3])); } Operations ops = new Operations(threads, sendRate, receiveRate); using (WorkerQueue<long> queue = new WorkerQueue<long>(threads, (t, _) => ops.OnReceive(t))) { if (duration == TimeSpan.Zero) { Console.WriteLine("; Press ENTER to quit."); } else { Console.WriteLine("Running for {0}...", duration); } ops.Start(queue); if (duration == TimeSpan.Zero) { Console.ReadLine(); } else { Thread.Sleep(duration); } ops.Stop(); } Console.WriteLine("Done."); } }
Whew! Performance testing is hard work! But now we’re ready to go. Let’s consider our first performance question: what is the maximum send rate we can achieve without overwhelming the consumer(s)?
The easiest way to figure this out will be to set the thread count to 1
and set the send and receive rates both to some number R
. We will then need to try various values of R
and see where the receiver begins lagging.
Here are the results I got on my computer (“Intel Core i7-3960X CPU 3.30GHz (Ivy Bridge), 1 CPU, 12 logical and 6 physical cores” according to BenchmarkDotNet) using .NET 4.7.2:
Things go smoothly up to about 800K/sec then fall apart at 1000K/sec. It looks like the maximum stable rate is thus ~800K/sec. Next question: can we improve on this result by using more threads?
For my computer, a perfectly parallel hyperthreaded workload should consume 12 seconds of CPU time every second for 100% utilization (12 threads x 1 sec CPU time / thread). The results above show that we started to reach about 84% utilization of 2 threads which would imply we have quite a bit of headroom before our theoretical limit. So let’s try varying the number of threads — we’ll reserve 1 thread for the producer, which leaves a max of 11 threads for the consumer. Assuming that we use the same stable send rate of 800K/sec and that each receiver can exactly match this, we can gather the results for 2, 4, 6, 8, and 11 threads.
Well, that’s no good… adding more threads not only doesn’t help the situation but actually reduces the achieved operation rate. At no point are we ever able to use more than about 34% of available CPU time. What is happening here? The answer is: Amdahl’s law!
Put simply, the theoretical performance improvement of adding more threads can only be achieved if there is enough parallelizable work. In this case, our program appears to be dominated by serialized execution steps — probably due to contention between readers and writers of the ConcurrentQueue
, since only one queue operation can actually occur simultaneously.
These results should encourage you to be cautious before nonchalantly adding more threads in an effort to improve throughput. The speedup you expect to see would only be realized if there is sufficient latency to begin with.
I believe there is the following bug: If many items are enqueued the data available event will transition to the signaled state only once. Then, only a single thread might be woken up although many items are available for processing. This might affect the benchmarks that use multiple threads.
That is true. This code sample is based on similar code I’ve seen in real systems, and the same issues exist there. With a high enough arrival rate and operation latency, it might be less noticeable; the authors of these systems may even consider this a feature and not a bug. 🙂
Pingback: More performance experiments: queues and threads – WriteAsync .NET
The event is signaled on every added item. If large number of items arrive, all threads will be signaled. Threads only go to rest when queue is empty. The “bug” will never happen.