{"id":311,"date":"2013-12-11T13:00:47","date_gmt":"2013-12-11T13:00:47","guid":{"rendered":"http:\/\/writeasync.net\/?p=311"},"modified":"2013-11-26T17:20:40","modified_gmt":"2013-11-26T17:20:40","slug":"memorychannel-and-concurrency","status":"publish","type":"post","link":"http:\/\/writeasync.net\/?p=311","title":{"rendered":"MemoryChannel and concurrency"},"content":{"rendered":"<p>In a <a title=\"TDD + async: Introducing MemoryChannel\" href=\"http:\/\/writeasync.net\/?p=211\">previous post<\/a>, I described <code>MemoryChannel<\/code> and how I used TDD to implement it. After <a title=\"&quot;Dispose completes pending receive and causes subsequent send and receive to throw ObjectDisposed&quot;\" href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/commit\/4f739a14e3833fdf7c2ebc1e56c6b4e5103ccfd7\">the 25th commit<\/a>, 20 unit tests later, I had a fully functional <em>but single-threaded<\/em> implementation.<\/p>\n<p>Simply by inspection, I knew that this code would fail almost immediately if a sender and receiver were ever executed on concurrent threads. For example, <code>Send<\/code> and <code>ReceiveAsync<\/code> can potentially modify the <code>excessBuffers<\/code> <a title=\"LinkedList&lt;T&gt; Class\" href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/he2s3bh7(v=vs.110).aspx\">LinkedList<\/a> concurrently which may corrupt its internal state.<\/p>\n<p>I figured that a thread-safe <code>MemoryChannel<\/code> would be generally more useful, if a bit more complex. Thus I began devising an integration test application, knowing that my unit tests could not be of much help here.<\/p>\n<p>I started by writing a simple skeleton test app with a <code>Receiver<\/code> class (async receive loop) and a <code>Sender<\/code> class (background thread send loop). The test app started out like this:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nLogger logger = new Logger();\r\nMemoryChannel channel = new MemoryChannel();\r\n\r\nReceiver receiver = new Receiver(channel, logger, 16);\r\nSender sender = new Sender(channel, logger, 16, 1);\r\n\r\nTask receiverTask = receiver.RunAsync();\r\nTask senderTask = sender.RunAsync();\r\n\r\nTask.WaitAll(receiverTask, senderTask);\r\n\r\nchannel.Dispose();\r\n\r\nlogger.WriteLine(&quot;Done.&quot;);\r\n<\/pre>\n<p><a title=\"&quot;Implement basic receiver loop&quot;\" href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/commit\/b62bf367206c774e43830d674b44b77078334f66\">At this point<\/a>, I had only implemented the receive loop &#8212; the <code>Sender<\/code> was just a single Send operation running on a background thread. But this was enough to <em>consistently<\/em> trigger an invalid state:<\/p>\n<p><code>System.InvalidOperationException: A receive operation is already in progress.<br \/>\nat CommSample.MemoryChannel.ReceiveAsync(Byte[] buffer) in CommSample\\source\\CommSample.Core\\MemoryChannel.cs:line 39<br \/>\nat CommSample.Receiver.&lt;RunAsync&gt;d__0.MoveNext() in CommSample\\source\\CommSample.App\\Receiver.cs:line 34<br \/>\n--- End of inner exception stack trace ---<br \/>\nat System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)<br \/>\nat System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)<br \/>\nat System.Threading.Tasks.Task.WaitAll(Task[] tasks)<br \/>\nat CommSample.Program.Main(String[] args) in CommSample\\source\\CommSample.App\\Program.cs:line 24<\/code><\/p>\n<p>Using the debugger and setting a breakpoint on the exception condition in <code>MemoryChannel.ReceiveAsync<\/code>, we can get a more useful stack trace that illustrates the problem:<\/p>\n<ul>\n<li><code>CommSample.Core.dll!CommSample.MemoryChannel.ReceiveAsync(byte[] buffer)<\/code> Line 36<\/li>\n<\/li>\n<li><code>CommSample.App.exe!CommSample.Receiver.RunAsync()<\/code> Line 34<\/li>\n<li>[Resuming Async Method]<\/li>\n<li><em>[ . . . ]<\/em><\/li>\n<li><code>mscorlib.dll!System.Threading.Tasks.TaskCompletionSource&lt;int&gt;.SetResult(int result)<\/code><\/li>\n<li><code>CommSample.Core.dll!CommSample.MemoryChannel.ReceiveRequest.TryComplete(bool disposing)<\/code> Line 161<\/li>\n<li><code>CommSample.Core.dll!CommSample.MemoryChannel.Send(byte[] buffer)<\/code> Line 75<\/li>\n<li><code>CommSample.App.exe!CommSample.Sender.RunInner()<\/code> Line 41<\/li>\n<li><em>[ . . . ]<\/em><\/li>\n<li><code>mscorlib.dll!System.Threading.ThreadHelper.ThreadStart(object obj)<\/code><\/li>\n<\/ul>\n<p>Ah, so it is the <strong>synchronous continuation execution<\/strong> triggered by <code>TaskCompletionSource.SetResult<\/code> that is biting us here! To help explain further, let&#8217;s make a small modification to the <code>Receiver<\/code> loop:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\ndo\r\n{\r\n    this.logger.WriteLine(&quot;Before await...&quot;);\r\n    try\r\n    {\r\n        bytesRead = await this.channel.ReceiveAsync(buffer);\r\n    }\r\n    catch (Exception e)\r\n    {\r\n        this.logger.WriteLine(&quot;Receive threw: {0}&quot;, e);\r\n        throw;\r\n    }\r\n\r\n    this.logger.WriteLine(&quot;After await...&quot;);\r\n    totalBytes += bytesRead;\r\n}\r\nwhile (bytesRead &gt; 0);\r\n<\/pre>\n<p>Running the app now results in the following output:<\/p>\n<p><code>[0000.004\/T01] Before await...<br \/>\n[0000.008\/T01] Sender B=16\/F=0x1 starting...<br \/>\n[0000.010\/T03] After await...<br \/>\n[0000.010\/T03] Before await...<br \/>\n[0000.012\/T03] Receive threw: System.InvalidOperationException: A receive operation is already in progress.<br \/>\n   at CommSample.MemoryChannel.ReceiveAsync(Byte[] buffer) in CommSample.Core\\MemoryChannel.cs:line 39<br \/>\n   at CommSample.Receiver.<RunAsync>d__0.MoveNext() in CommSample\\source\\CommSample<br \/>\n.App\\Receiver.cs:line 37<br \/>\n[0000.012\/T03] Sender B=16\/F=0x1 completed. Sent 16 bytes.<\/code><\/p>\n<p>As expected, the first <code>await<\/code> we hit returns control back to the caller with one receive now in progress. Note that the code <em>after<\/em> the first <code>await<\/code> and <em>before<\/em> the next <code>await<\/code> is scheduled as a continuation at this point. The sender then runs which fulfills the pending receive. This triggers the continuation, which in turn raises an exception since the code to set <code>pendingReceive<\/code> to <code>null<\/code> hasn&#8217;t had a chance to run yet.<\/p>\n<p>Technically this issue could be induced by a single-threaded unit test with some clever use of <a href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/dd270696(v=vs.110).aspx\" title=\"Task.ContinueWith Method (Action&lt;Task&gt;)\">ContinueWith<\/a>. However, the &#8220;fix&#8221; one would likely make in response would not solve the broad issue of race conditions inherent in the code at this point.<\/p>\n<p>Instead, I forged ahead and reorganized the code to eliminate thread-safety issues. I started by <a href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/commit\/ea52a9bfac7ccb8fb42818c983ed7edb8c78930e\" title=\"&quot;Add locking to receive&quot;\">adding locking to the receive logic<\/a>. I noted that <code>excessBuffers<\/code>, being a reference type and a read-only private instance field, would work fine as a lock object. I moved all of the logic of this method under this lock, save for the final conditional logic to complete the result (to prevent deadlocks, you should generally avoid running arbitrary user code such as event handlers or in this case, continuations, under a lock).<\/p>\n<p>This was a fine start but it was only the beginning. With locking, <em>everyone<\/em> must participate in order to ensure thread-safety. To complete the implementation, I added locking to <a href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/commit\/be3abffb31b0a5f681575b23258e79e62bd62690\" title=\"&quot;Add locking to send&quot;\">send<\/a> (with one <a href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/commit\/b6e3aeb623670eeed99c72ede8847e1ca34d4afe\" title=\"&quot;Correct locking in send&quot;\">later fix<\/a>) and <a href=\"https:\/\/github.com\/brian-dot-net\/writeasync\/commit\/29f712bef7ff32a8c24ba902b5dbe4d8779cc659\" title=\"&quot;Add locking to Dispose&quot;\">dispose<\/a>.<\/p>\n<p>At this point, I was ready to complete the real integration test application. The story continues in the next post&#8230;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In a previous post, I described MemoryChannel and how I used TDD to implement it. After the 25th commit, 20 unit tests later, I had a fully functional but single-threaded implementation. Simply by inspection, I knew that this code would&hellip; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[21,61],"tags":[],"class_list":["post-311","post","type-post","status-publish","format-standard","hentry","category-async","category-concurrency"],"_links":{"self":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/311","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=311"}],"version-history":[{"count":0,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/311\/revisions"}],"wp:attachment":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=311"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=311"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=311"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}