{"id":1511,"date":"2014-01-29T13:00:43","date_gmt":"2014-01-29T13:00:43","guid":{"rendered":"http:\/\/writeasync.net\/?p=1511"},"modified":"2014-01-27T10:15:37","modified_gmt":"2014-01-27T10:15:37","slug":"non-blockingcollection","status":"publish","type":"post","link":"http:\/\/writeasync.net\/?p=1511","title":{"rendered":"InputQueue, the non-BlockingCollection"},"content":{"rendered":"<p>The .NET 4.0+ solution to the <a href=\"http:\/\/en.wikipedia.org\/wiki\/Producer-consumer_problem\">producer-consumer problem<\/a> is <a href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/dd267312(v=vs.110).aspx\"><code>BlockingCollection<\/code><\/a>. A sample app with a single producer and consumer using an ordered queue of integers would look something like this:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nprivate static async Task EnqueueLoopAsync(BlockingCollection&lt;int&gt; queue, CancellationToken token)\r\n{\r\n    int i = 0;\r\n    while (!token.IsCancellationRequested)\r\n    {\r\n        ++i;\r\n        queue.Add(i);\r\n        await Task.Delay(1);\r\n    }\r\n}\r\n\r\nprivate static void DequeueLoop(BlockingCollection&lt;int&gt; queue, CancellationToken token)\r\n{\r\n    int previous = 0;\r\n    try\r\n    {\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            int current = queue.Take(token);\r\n            if (current - previous != 1)\r\n            {\r\n                throw GetOutOfOrderError(current, previous);\r\n            }\r\n\r\n            previous = current;\r\n        }\r\n    }\r\n    catch (OperationCanceledException)\r\n    {\r\n    }\r\n}\r\n<\/pre>\n<p>This is simple enough, but the problem is that <code>DequeueLoop<\/code>. It spends a lot of time (relatively speaking) blocking the thread due to the synchronous <a href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/dd381908(v=vs.110).aspx\"><code>Take<\/code><\/a> method. About the best you can do, async-wise, is use <a href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/dd287154(v=vs.110).aspx\"><code>TryTake<\/code><\/a> with a small timeout and yield a bit on empty, for example:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nprivate static async Task DequeueLoopAsync(BlockingCollection&lt;int&gt; queue, CancellationToken token)\r\n{\r\n    int previous = 0;\r\n    try\r\n    {\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            int current;\r\n            if (queue.TryTake(out current, 1))\r\n            {\r\n                if (current - previous != 1)\r\n                {\r\n                    throw GetOutOfOrderError(current, previous);\r\n                }\r\n\r\n                previous = current;\r\n            }\r\n            else\r\n            {\r\n                await Task.Delay(1);\r\n            }\r\n        }\r\n    }\r\n    catch (OperationCanceledException)\r\n    {\r\n    }\r\n}\r\n<\/pre>\n<p>Certainly not ideal, but what did we expect from a <code><em>Blocking<\/em>Collection<\/code> anyway?<\/p>\n<p>For true async support, we&#8217;ll need to come up with a different data structure. And this is exactly what I did in my new <a href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/tree\/master\/projects\/QueueSample\">QueueSample project<\/a>.<\/p>\n<p>The main class of interest is <a href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/blob\/master\/projects\/QueueSample\/source\/QueueSample.Core\/InputQueue.cs\"><code>InputQueue<\/code><\/a>. (If you&#8217;ve seen my earlier blog posts, you might note some <a href=\"http:\/\/writeasync.net\/?p=211\" title=\"TDD + async: Introducing MemoryChannel\">similarities to <code>MemoryChannel<\/code><\/a>.) Its interface is as follows:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic class InputQueue&lt;T&gt; : IInputQueue&lt;T&gt;\r\n{\r\n    public InputQueue()\r\n    public Task&lt;T&gt; DequeueAsync();\r\n    public void Enqueue(T item);\r\n    public void Dispose();\r\n}\r\n<\/pre>\n<p>Some implementation notes:<\/p>\n<ul>\n<li>For simplicity, <code>InputQueue<\/code> allows many producers but only a <em>single<\/em> consumer at a time.<\/code><\/li>\n<li>Again for simplicity, the only way to cancel a pending <code>DequeueAsync<\/code> operation is to <code>Dispose()<\/code> the queue.\n<li><code>Enqueue<\/code> can complete a pending <code>DequeueAsync<\/code> operation which means that it could run synchronous continuations on the calling thread. Normally this isn&#8217;t a problem except in cases where you have a long chain of tasks that all complete synchronously. See this <a href=\"http:\/\/stackoverflow.com\/questions\/12693046\/configuring-the-continuation-behaviour-of-a-taskcompletionsources-task\">StackOverflow question<\/a> and <a href=\"http:\/\/blogs.msdn.com\/b\/pfxteam\/archive\/2012\/02\/11\/10266920.aspx\">Stephen Toub&#8217;s blog post<\/a> for more information.\n<\/ul>\n<p>With this true async dequeue method at our disposal, we can rewrite our consumer like so:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nprivate static async Task DequeueLoopAsync(IInputQueue&lt;int&gt; queue, CancellationToken token)\r\n{\r\n    int previous = 0;\r\n    try\r\n    {\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            int current = await queue.DequeueAsync();\r\n            if (current - previous != 1)\r\n            {\r\n                throw GetOutOfOrderError(current, previous);\r\n            }\r\n\r\n            previous = current;\r\n        }\r\n    }\r\n    catch (ObjectDisposedException)\r\n    {\r\n    }\r\n}\r\n<\/pre>\n<p>In a later post, I&#8217;ll talk about more ways we can use this <em>non<\/em>-blocking collection.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>The .NET 4.0+ solution to the producer-consumer problem is BlockingCollection. A sample app with a single producer and consumer using an ordered queue of integers would look something like this: private static async Task EnqueueLoopAsync(BlockingCollection&lt;int&gt; queue, CancellationToken token) { int&hellip; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[21,61,41],"tags":[],"class_list":["post-1511","post","type-post","status-publish","format-standard","hentry","category-async","category-concurrency","category-tdd"],"_links":{"self":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/1511","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1511"}],"version-history":[{"count":0,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/1511\/revisions"}],"wp:attachment":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1511"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1511"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1511"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}