{"id":5567,"date":"2018-11-11T14:00:10","date_gmt":"2018-11-11T14:00:10","guid":{"rendered":"http:\/\/writeasync.net\/?p=5567"},"modified":"2018-11-04T03:49:24","modified_gmt":"2018-11-04T03:49:24","slug":"more-performance-experiments-queues-and-threads","status":"publish","type":"post","link":"http:\/\/writeasync.net\/?p=5567","title":{"rendered":"More performance experiments: queues and threads"},"content":{"rendered":"<p>Continuing from our previous <a href=\"http:\/\/writeasync.net\/?p=5555\">performance experiment<\/a>, I would like to see if there are any easy optimizations to apply to squeeze more throughput out of this producer\/consumer queue.<\/p>\n<p>One possible angle of attack is to replace the implicit synchronization primitives with a data structure that already takes care of this aspect for us. As it turns out, <a href=\"https:\/\/docs.microsoft.com\/en-us\/dotnet\/api\/system.collections.concurrent.blockingcollection-1?view=netframework-4.7.2#remarks\">BlockingCollection<\/a> already uses a <a href=\"https:\/\/docs.microsoft.com\/en-us\/dotnet\/api\/system.collections.concurrent.concurrentqueue-1?view=netframework-4.7.2\">ConcurrentQueue<\/a> internally and is highly optimized for the contentious reader\/writer pattern we are interested in. Let&#8217;s see if the results are more favorable.<\/p>\n<p>Instead of just replacing <code>WorkerQueue<\/code> outright, we can rename it to <code>WorkerQueue1<\/code> and create a new <code>WorkerQueue2<\/code> with our new pattern. We should also prepare for easily swapping out implementations by creating an interface <code>IWorkerQueue<\/code>. Basically we now have this:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n    public sealed class WorkerQueue1&lt;T&gt; : IWorkerQueue&lt;T&gt;\r\n    {\r\n        \/\/ . . .\r\n    }\r\n\r\n    public interface IWorkerQueue&lt;T&gt; : IDisposable\r\n    {\r\n        void Add(T item);\r\n    }\r\n<\/pre>\n<p>The tests also need an overhaul since they were taking advantage of some implementation details of the synchronization. Here are the revised tests, using the <a href=\"http:\/\/wiki.c2.com\/?AbstractTest\">abstract test pattern<\/a>:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n\r\n    &#x5B;TestClass]\r\n    public abstract class WorkerQueueTest\r\n    {\r\n        &#x5B;TestMethod]\r\n        public void OneThread()\r\n        {\r\n            List&lt;int&gt; received = new List&lt;int&gt;();\r\n\r\n            using (IWorkerQueue&lt;int&gt; queue = this.Create(1, (t, i) =&gt; received.Add(i + t)))\r\n            {\r\n                for (int i = 0; i &lt; 1000; ++i)\r\n                {\r\n                    queue.Add(i);\r\n                }\r\n\r\n                while (received.Count == 0)\r\n                {\r\n                }\r\n            }\r\n\r\n            int count = received.Count;\r\n            count.Should().BeGreaterThan(0);\r\n            received.Should().ContainInOrder(Enumerable.Range(0, count));\r\n        }\r\n\r\n        &#x5B;TestMethod]\r\n        public void TwoThreads()\r\n        {\r\n            int sum = 0;\r\n\r\n            using (IWorkerQueue&lt;int&gt; queue = this.Create(2, (_, i) =&gt; Interlocked.Add(ref sum, i)))\r\n            {\r\n                for (int i = 1; i &lt;= 1000; ++i)\r\n                {\r\n                    queue.Add(i);\r\n                }\r\n\r\n                while (Volatile.Read(ref sum) != 500500)\r\n                {\r\n                }\r\n            }\r\n\r\n            sum.Should().Be(500500); \/\/ 1 + 2 + ... + 999 + 1000\r\n        }\r\n\r\n        &#x5B;TestMethod]\r\n        public void FourThreads()\r\n        {\r\n            int sum = 0;\r\n\r\n            using (IWorkerQueue&lt;int&gt; queue = this.Create(4, (_, i) =&gt; Interlocked.Add(ref sum, i)))\r\n            {\r\n                for (int i = 1; i &lt;= 1000; ++i)\r\n                {\r\n                    queue.Add(i);\r\n                }\r\n\r\n                while (Volatile.Read(ref sum) != 500500)\r\n                {\r\n                }\r\n            }\r\n\r\n            sum.Should().Be(500500); \/\/ 1 + 2 + ... + 999 + 1000\r\n        }\r\n\r\n        &#x5B;TestMethod]\r\n        public void TwoThreadsOneBlocked()\r\n        {\r\n            List&lt;int&gt; received = new List&lt;int&gt;();\r\n            ManualResetEventSlim block = new ManualResetEventSlim();\r\n            Action&lt;int, int&gt; onReceive = (t, i) =&gt;\r\n            {\r\n                if (t == 0)\r\n                {\r\n                    block.Wait();\r\n                }\r\n                else\r\n                {\r\n                    received.Add(i);\r\n                }\r\n            };\r\n\r\n            using (block)\r\n            using (IWorkerQueue&lt;int&gt; queue = this.Create(2, onReceive))\r\n            {\r\n                for (int i = 0; i &lt;= 1000; ++i)\r\n                {\r\n                    queue.Add(i);\r\n                }\r\n\r\n                while (received.Count != 1000)\r\n                {\r\n                }\r\n\r\n                block.Set();\r\n            }\r\n\r\n            \/\/ Either 0 or 1 could be consumed by the blocked receiver, but all other\r\n            \/\/ values (2 .. 1000) should be read in order by the other receiver.\r\n            received.Should().HaveCount(1000).And.ContainInOrder(Enumerable.Range(2, 999));\r\n        }\r\n\r\n        &#x5B;TestMethod]\r\n        public void TwoThreadsBothBlocked()\r\n        {\r\n            int received = 0;\r\n            ManualResetEventSlim block = new ManualResetEventSlim();\r\n            Action&lt;int, int&gt; onReceive = (t, i) =&gt;\r\n            {\r\n                Interlocked.Add(ref received, i + t);\r\n                block.Wait();\r\n            };\r\n\r\n            using (block)\r\n            using (IWorkerQueue&lt;int&gt; queue = this.Create(2, onReceive))\r\n            {\r\n                for (int i = 1; i &lt;= 1000; ++i)\r\n                {\r\n                    queue.Add(i);\r\n                }\r\n\r\n                while (Volatile.Read(ref received) != 4)\r\n                {\r\n                }\r\n\r\n                block.Set();\r\n            }\r\n\r\n            \/\/ Should receive at least (1 + 2) + (0 + 1); additional values may be present.\r\n            received.Should().BeGreaterOrEqualTo(4);\r\n        }\r\n\r\n        protected abstract IWorkerQueue&lt;int&gt; Create(int threads, Action&lt;int, int&gt; onReceive);\r\n    }\r\n<\/pre>\n<p>Note the abstract factory method which needs to be implemented by the concrete test classes. Here are those classes now:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n    &#x5B;TestClass]\r\n    public sealed class WorkerQueue1Test : WorkerQueueTest\r\n    {\r\n        protected override IWorkerQueue&lt;int&gt; Create(int threads, Action&lt;int, int&gt; onReceive)\r\n        {\r\n            return new WorkerQueue1&lt;int&gt;(threads, onReceive);\r\n        }\r\n    }\r\n\r\n    &#x5B;TestClass]\r\n    public sealed class WorkerQueue2Test : WorkerQueueTest\r\n    {\r\n        protected override IWorkerQueue&lt;int&gt; Create(int threads, Action&lt;int, int&gt; onReceive)\r\n        {\r\n            return new WorkerQueue2&lt;int&gt;(threads, onReceive);\r\n        }\r\n    }\r\n<\/pre>\n<p>And finally, the implementation of <code>WorkerQueue2<\/code> which ditches the <code>Auto\/ManualResetEvent<\/code> logic and embraces a more modern <code>Task<\/code>-oriented design:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n    using System;\r\n    using System.Collections.Concurrent;\r\n    using System.Threading;\r\n    using System.Threading.Tasks;\r\n\r\n    public sealed class WorkerQueue2&lt;T&gt; : IWorkerQueue&lt;T&gt;\r\n    {\r\n        private readonly BlockingCollection&lt;T&gt; queue;\r\n        private readonly Action&lt;int, T&gt; onReceive;\r\n        private readonly CancellationTokenSource cts;\r\n        private readonly Task&#x5B;] tasks;\r\n\r\n        public WorkerQueue2(int threads, Action&lt;int, T&gt; onReceive)\r\n        {\r\n            this.queue = new BlockingCollection&lt;T&gt;();\r\n            this.onReceive = onReceive;\r\n            this.cts = new CancellationTokenSource();\r\n            this.tasks = new Task&#x5B;threads];\r\n\r\n            for (int i = 0; i &lt; threads; ++i)\r\n            {\r\n                int index = i;\r\n                this.tasks&#x5B;index] = Task.Run(() =&gt; this.Receive(index, cts.Token));\r\n            }\r\n        }\r\n\r\n        public void Add(T item)\r\n        {\r\n            this.queue.Add(item);\r\n        }\r\n\r\n        public void Dispose()\r\n        {\r\n            this.queue.CompleteAdding();\r\n            this.cts.Cancel();\r\n            Task.WaitAll(this.tasks);\r\n            this.cts.Dispose();\r\n            this.queue.Dispose();\r\n        }\r\n\r\n        private void Receive(int thread, CancellationToken token)\r\n        {\r\n            try\r\n            {\r\n                while (!token.IsCancellationRequested)\r\n                {\r\n                    if (this.queue.TryTake(out T data, -1, token))\r\n                    {\r\n                        this.onReceive(thread, data);\r\n                    }\r\n                }\r\n            }\r\n            catch (OperationCanceledException)\r\n            {\r\n            }\r\n        }\r\n    }\r\n<\/pre>\n<p>We will again ask our initial performance question: <strong>what is the maximum send rate we can achieve without overwhelming the consumer(s)?<\/strong><\/p>\n<p><a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2200000_2200000.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2200000_2200000-300x181.png\" alt=\"\" width=\"300\" height=\"181\" class=\"alignnone size-medium wp-image-5568\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2200000_2200000-300x181.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2200000_2200000.png 480w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p><a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2300000_2300000.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2300000_2300000-300x181.png\" alt=\"\" width=\"300\" height=\"181\" class=\"alignnone size-medium wp-image-5569\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2300000_2300000-300x181.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2300000_2300000.png 480w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p><a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2400000_2400000.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2400000_2400000-300x181.png\" alt=\"\" width=\"300\" height=\"181\" class=\"alignnone size-medium wp-image-5570\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2400000_2400000-300x181.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2400000_2400000.png 480w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p><a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2500000_2500000.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2500000_2500000-300x181.png\" alt=\"\" width=\"300\" height=\"181\" class=\"alignnone size-medium wp-image-5571\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2500000_2500000-300x181.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2500000_2500000.png 480w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p><a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2600000_2600000.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2600000_2600000-300x181.png\" alt=\"\" width=\"300\" height=\"181\" class=\"alignnone size-medium wp-image-5572\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2600000_2600000-300x181.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2018\/11\/Perf2_1_2600000_2600000.png 480w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p>Here we can start to see the limits of the accuracy of <a href=\"http:\/\/writeasync.net\/?p=5548\">the throttle<\/a>. We ostensibly send at a rate of 2200K\/sec, 2300K\/sec, 2400K\/sec, etc. but the maximum throughput achieved is actually around 2220K\/sec. The attempt to send at 2600K\/sec fails horribly and overwhelms the receiver. So we can conclude that 2200K\/sec is the upper bound; this is a full 2.75 times faster than our first cut. Not bad, <code>BlockingCollection<\/code>!<\/p>\n<p>One more performance question for today: <strong>what is the hottest path here?<\/strong> That is, what is contributing to the scalability limit we are hitting? This sounds like a job for the <a href=\"https:\/\/docs.microsoft.com\/en-us\/visualstudio\/profiling\/profiling-feature-tour?view=vs-2017\">Visual Studio profiler<\/a>.<\/p>\n<p>First we need to make our application more profiler friendly. Let&#8217;s extract a few methods so that we can start a default scenario when zero args are passed:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n        private static void Main(string&#x5B;] args)\r\n        {\r\n            if (args.Length == 0)\r\n            {\r\n                RunProfile();\r\n            }\r\n            else\r\n            {\r\n                RunNormal(args);\r\n            }\r\n        }\r\n\r\n        private static void RunProfile()\r\n        {\r\n            Rate rate = Rate.PerSecond(2200000.0d);\r\n            TimeSpan duration = TimeSpan.FromSeconds(10.0d);\r\n            RunQueue(1, rate, rate, duration);\r\n        }\r\n\r\n        \/\/ . . .\r\n<\/pre>\n<p>Now we go to the ALT+F2 menu, select &#8220;CPU Usage&#8221; and off we go. After about 10 seconds, the generated report comes up with these results:<\/p>\n<table>\n<tr>\n<th>Function Name<\/th>\n<th>Total CPU [ms, %]<\/th>\n<th>Self CPU [ms, %]<\/th>\n<th>Module<\/th>\n<\/tr>\n<tr>\n<td><strong>QueuePerf.exe (PID: 11620)<\/strong><\/td>\n<td>14685 (100.00%)<\/td>\n<td>14685 (100.00%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>QueuePerf.WorkerQueue2`1[System.Int64]::Receive<\/td>\n<td>7976 (54.31%)<\/td>\n<td>255 (1.74%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>QueuePerf.Operations::Send<\/td>\n<td>6471 (44.07%)<\/td>\n<td>567 (3.86%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>[External Call] System.Collections.Concurrent.BlockingCollection`1[System.Int64]::TryTakeWithNoTimeValidation<\/td>\n<td>4778 (32.54%)<\/td>\n<td style='background-color: yellow;'>4778 (32.54%)<\/td>\n<td>Multiple modules<\/td>\n<\/tr>\n<tr>\n<td>QueuePerf.Throttle::Wait<\/td>\n<td>4054 (27.61%)<\/td>\n<td>524 (3.57%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>QueuePerf.WorkerQueue2`1[System.Int64]::Add<\/td>\n<td>3943 (26.85%)<\/td>\n<td>18 (0.12%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>[External Call] System.Collections.Concurrent.BlockingCollection`1[System.Int64]::TryAddWithNoTimeValidation<\/td>\n<td>3917 (26.67%)<\/td>\n<td style='background-color: yellow;'>3917 (26.67%)<\/td>\n<td>Multiple modules<\/td>\n<\/tr>\n<tr>\n<td>QueuePerf.RealClock::get_Elapsed<\/td>\n<td>3337 (22.72%)<\/td>\n<td>75 (0.51%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>[External Call] System.Diagnostics.Stopwatch.GetElapsedDateTimeTicks()$##60030C4<\/td>\n<td>3262 (22.21%)<\/td>\n<td style='background-color: yellow;'>3262 (22.21%)<\/td>\n<td>Multiple modules<\/td>\n<\/tr>\n<tr>\n<td>QueuePerf.Program+&lt;&gt;c__DisplayClass3_0::&lt;RunQueue&gt;b__0<\/td>\n<td>2892 (19.69%)<\/td>\n<td>796 (5.42%)<\/td>\n<td>QueuePerf.exe<\/td>\n<\/tr>\n<tr>\n<td>[External Call] System.TimeSpan.Subtract(System.TimeSpan)$##6001355<\/td>\n<td>123 (0.84%)<\/td>\n<td>123 (0.84%)<\/td>\n<td>mscorlib.ni.dll<\/td>\n<\/tr>\n<\/table>\n<p>I&#8217;ve <span style='background-color: yellow;'>highlighted<\/span> the top three most expensive calls, accounting for <a href=\"https:\/\/blogs.msdn.microsoft.com\/profiler\/2004\/06\/09\/what-are-exclusive-and-inclusive\/\">exclusive (self) CPU time<\/a>. Two of them are the expected TryAdd\/TryTake methods from BlockingCollection itself. The third is a Stopwatch method which is invoked by the Throttle. It seems that we are wasting quite a bit of time just trying to calculate how long we should wait and rarely ever actually waiting. If I were so inclined, I could act on this valuable data and build a higher performance throttle; for instance, maybe it should more appropriately scale down the frequency of its time calculations as the operation rate goes higher. I&#8217;ll save that for another time, but suffice it to say that a profiler is an essential tool when analyzing performance results. Sometimes the measurement methodology itself is part of the bottleneck!<\/p>\n<p>That wraps up today&#8217;s investigation. But we haven&#8217;t yet seen the last of the WorkerQueue&#8230;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Continuing from our previous performance experiment, I would like to see if there are any easy optimizations to apply to squeeze more throughput out of this producer\/consumer queue. One possible angle of attack is to replace the implicit synchronization primitives&hellip; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[61,104,51],"tags":[],"class_list":["post-5567","post","type-post","status-publish","format-standard","hentry","category-concurrency","category-performance","category-testing"],"_links":{"self":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5567","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5567"}],"version-history":[{"count":5,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5567\/revisions"}],"predecessor-version":[{"id":5577,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5567\/revisions\/5577"}],"wp:attachment":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5567"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5567"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5567"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}