{"id":5261,"date":"2015-12-16T13:00:04","date_gmt":"2015-12-16T13:00:04","guid":{"rendered":"http:\/\/writeasync.net\/?p=5261"},"modified":"2015-12-17T04:22:25","modified_gmt":"2015-12-17T04:22:25","slug":"efficient-concurrency-prevention","status":"publish","type":"post","link":"https:\/\/writeasync.net\/?p=5261","title":{"rendered":"Efficient concurrency prevention"},"content":{"rendered":"<p>Sometimes you want <a href=\"http:\/\/stackoverflow.com\/questions\/4844637\/what-is-the-difference-between-concurrency-parallelism-and-asynchronous-methods\">asynchrony but not concurrency<\/a>. For example, if you are writing data to a file, you should generally prefer asynchronous I\/O, but you probably don&#8217;t want 10 other competing callers to corrupt the contents.<\/p>\n<p>Perhaps the simplest way is to just protect the call with a lock, e.g.:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic class MyFile\r\n{\r\n    private readonly object syncRoot;\r\n    private bool writing;\r\n    \/\/ ...\r\n\r\n    public MyFile()\r\n    {\r\n        this.syncRoot = new object();\r\n    }\r\n\r\n    public async Task WriteAsync(byte&#x5B;] data)\r\n    {\r\n        lock (this.syncRoot)\r\n        {\r\n           if (writing)\r\n           {\r\n               throw new InvalidOperationException(&quot;Conflict!&quot;);\r\n           }\r\n\r\n           writing = true;\r\n        }\r\n\r\n        try\r\n        {\r\n            \/\/ ... do work here...\r\n        }\r\n        finally\r\n        {\r\n            lock (this.syncRoot)\r\n            {\r\n                writing = false;\r\n            }\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>This works fine and is simple enough. But that lock might be overkill. <a href=\"http:\/\/www.informit.com\/guides\/content.aspx?g=dotnet&#038;seqNum=600\">A lock can be expensive<\/a>, especially when you always force the caller to acquire it as above. Ideally, you don&#8217;t want to make the correct use case significantly slower just to account for possible incorrect usages. (But of course, don&#8217;t just take my word for it &#8212; <a href=\"https:\/\/www.facebook.com\/notes\/facebook-engineering\/three-optimization-tips-for-c\/10151361643253920\">measure it and see<\/a>!)<\/p>\n<p>Let me introduce a little class that can simplify this type of concurrency prevention <em>without<\/em> locks. Additionally, it assumes that the operation lifetime is managed by TaskCompletionSource; it could, for example, be used with the <a href=\"http:\/\/writeasync.net\/?p=5191\">VirtualDisk code in my previous post<\/a> where the bulk of the work is actually happening elsewhere in the Windows kernel.<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic class AsyncOperation\r\n{\r\n    private TaskCompletionSource&lt;bool&gt; tcs;\r\n\r\n    public Task Start()\r\n    {\r\n        TaskCompletionSource&lt;bool&gt; current = new TaskCompletionSource&lt;bool&gt;();\r\n        if (Interlocked.CompareExchange(ref this.tcs, current, null) != null)\r\n        {\r\n            throw new InvalidOperationException(&quot;An operation is already in progress.&quot;);\r\n        }\r\n\r\n        return current.Task;\r\n    }\r\n\r\n    public void Complete()\r\n    {\r\n        TaskCompletionSource&lt;bool&gt; current = Interlocked.Exchange(ref this.tcs, null);\r\n        if (current == null)\r\n        {\r\n            \/\/ Programming error!!\r\n            throw new InvalidOperationException(&quot;There is no active operation.&quot;);\r\n        }\r\n\r\n        current.SetResult(false);\r\n    }\r\n}\r\n<\/pre>\n<p>That&#8217;s it &#8212; just a few interlocked operations and we&#8217;re good to go. (The &#8220;complete with exception&#8221; path has been elided for brevity, but it is not too difficult to imagine how it would look.)<\/p>\n<p>But does it work? Here are some unit tests showing the basic state coverage:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n&#x5B;TestClass]\r\npublic class AsyncOperationTest\r\n{\r\n    &#x5B;TestMethod]\r\n    public void ShouldReturnTaskRepresentingOperationOnStart()\r\n    {\r\n        AsyncOperation operation = new AsyncOperation();\r\n\r\n        Task task = operation.Start();\r\n\r\n        Assert.AreEqual(TaskStatus.WaitingForActivation, task.Status);\r\n    }\r\n\r\n    &#x5B;TestMethod]\r\n    public void ShouldCompleteStartedTaskOnComplete()\r\n    {\r\n        AsyncOperation operation = new AsyncOperation();\r\n\r\n        Task task = operation.Start();\r\n        operation.Complete();\r\n\r\n        Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);\r\n    }\r\n\r\n    &#x5B;TestMethod]\r\n    public void ShouldFailToCompleteIfNotStarted()\r\n    {\r\n        AsyncOperation operation = new AsyncOperation();\r\n\r\n        try\r\n        {\r\n            operation.Complete();\r\n            Assert.Fail(&quot;Expected exception not thrown.&quot;);\r\n        }\r\n        catch (InvalidOperationException)\r\n        {\r\n        }\r\n    }\r\n\r\n    &#x5B;TestMethod]\r\n    public void ShouldFailToCompleteIfAlreadyCompleted()\r\n    {\r\n        AsyncOperation operation = new AsyncOperation();\r\n\r\n        operation.Start();\r\n        operation.Complete();\r\n\r\n        try\r\n        {\r\n            operation.Complete();\r\n            Assert.Fail(&quot;Expected exception not thrown.&quot;);\r\n        }\r\n        catch (InvalidOperationException)\r\n        {\r\n        }\r\n    }\r\n\r\n\r\n    &#x5B;TestMethod]\r\n    public void ShouldFailToStartIfAlreadyStartedWithoutInterferingWithInitialTask()\r\n    {\r\n        AsyncOperation operation = new AsyncOperation();\r\n\r\n        Task task = operation.Start();\r\n\r\n        try\r\n        {\r\n            operation.Start();\r\n            Assert.Fail(&quot;Expected exception not thrown.&quot;);\r\n        }\r\n        catch (InvalidOperationException)\r\n        {\r\n        }\r\n\r\n        Assert.AreEqual(TaskStatus.WaitingForActivation, task.Status);\r\n\r\n        operation.Complete();\r\n\r\n        Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);\r\n    }\r\n}\r\n<\/pre>\n<p>But we&#8217;re dealing with concurrency and race conditions and other such not-very-easy-to-show-in-a-unit-test factors. Let&#8217;s also write a few &#8220;slow&#8221; tests which will run a lot more operations designed to conflict with each other. The basic premise is that only one caller should ever be allowed to initiate an operation instance and only one caller should be able to complete a running operation, no matter how many are trying. A test to show that is ostensibly true might look like the following, using a thread barrier to synchronize the work phases and countdown event to track any consistency violations:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n&#x5B;TestClass]\r\npublic class AsyncOperationSlowTest\r\n{\r\n    &#x5B;TestMethod]\r\n    public void ShouldOnlyAllowOneTaskInFlightWithManyTryingToStart()\r\n    {\r\n        Task&#x5B;] tasks = new Task&#x5B;16];\r\n        AsyncOperation operation = new AsyncOperation();\r\n        CountdownEvent counter = new CountdownEvent(2);\r\n\r\n        Action afterWork = delegate\r\n        {\r\n            counter.Reset();\r\n            operation.Complete();\r\n        };\r\n\r\n        Barrier barrier = new Barrier(tasks.Length, b =&gt; afterWork());\r\n\r\n        Action&lt;CancellationToken&gt; doWork = delegate (CancellationToken token)\r\n        {\r\n            int iterations = 0;\r\n            int exceptions = 0;\r\n            try\r\n            {\r\n                while (!token.IsCancellationRequested)\r\n                {\r\n                    ++iterations;\r\n                    bool started = false;\r\n                    try\r\n                    {\r\n                        operation.Start();\r\n                        started = true;\r\n                    }\r\n                    catch (InvalidOperationException)\r\n                    {\r\n                        ++exceptions;\r\n                    }\r\n\r\n                    if (started)\r\n                    {\r\n                        bool consistencyViolated = counter.Signal();\r\n                        if (consistencyViolated)\r\n                        {\r\n                            throw new InvalidDataException(&quot;Consistency violated! Multiple calls to Start were allowed.&quot;);\r\n                        }\r\n                    }\r\n\r\n                    barrier.SignalAndWait(token);\r\n                }\r\n            }\r\n            catch (OperationCanceledException)\r\n            {\r\n            }\r\n\r\n            Console.WriteLine(&quot;Thread {0} ran {1} iterations and caught {2} exceptions.&quot;, Thread.CurrentThread.ManagedThreadId, iterations, exceptions);\r\n        };\r\n\r\n\r\n        using (CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30.0d)))\r\n        {\r\n            for (int i = 0; i &lt; tasks.Length; ++i)\r\n            {\r\n                tasks&#x5B;i] = Task.Factory.StartNew(() =&gt; doWork(cts.Token), TaskCreationOptions.LongRunning);\r\n            }\r\n\r\n            try\r\n            {\r\n                Task.WaitAny(tasks, cts.Token);\r\n            }\r\n            catch (OperationCanceledException)\r\n            {\r\n            }\r\n\r\n            Task.WaitAll(tasks);\r\n        }\r\n    }\r\n\r\n    &#x5B;TestMethod]\r\n    public void ShouldOnlyAllowOneTaskInFlightWithManyTryingToComplete()\r\n    {\r\n        Task&#x5B;] tasks = new Task&#x5B;16];\r\n        AsyncOperation operation = new AsyncOperation();\r\n        CountdownEvent counter = new CountdownEvent(2);\r\n\r\n        Action afterWork = delegate\r\n        {\r\n            counter.Reset();\r\n            operation.Start();\r\n        };\r\n\r\n        afterWork();\r\n\r\n        Barrier barrier = new Barrier(tasks.Length, b =&gt; afterWork());\r\n\r\n        Action&lt;CancellationToken&gt; doWork = delegate (CancellationToken token)\r\n        {\r\n            int iterations = 0;\r\n            int exceptions = 0;\r\n            try\r\n            {\r\n                while (!token.IsCancellationRequested)\r\n                {\r\n                    ++iterations;\r\n                    bool completed = false;\r\n                    try\r\n                    {\r\n                        operation.Complete();\r\n                        completed = true;\r\n                    }\r\n                    catch (InvalidOperationException)\r\n                    {\r\n                        ++exceptions;\r\n                    }\r\n\r\n                    if (completed)\r\n                    {\r\n                        bool consistencyViolated = counter.Signal();\r\n                        if (consistencyViolated)\r\n                        {\r\n                            throw new InvalidDataException(&quot;Consistency violated! Multiple calls to Complete were allowed.&quot;);\r\n                        }\r\n                    }\r\n\r\n                    barrier.SignalAndWait(token);\r\n                }\r\n            }\r\n            catch (OperationCanceledException)\r\n            {\r\n            }\r\n\r\n            Console.WriteLine(&quot;Thread {0} ran {1} iterations and caught {2} exceptions.&quot;, Thread.CurrentThread.ManagedThreadId, iterations, exceptions);\r\n        };\r\n\r\n\r\n        using (CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(30.0d)))\r\n        {\r\n            for (int i = 0; i &lt; tasks.Length; ++i)\r\n            {\r\n                tasks&#x5B;i] = Task.Factory.StartNew(() =&gt; doWork(cts.Token), TaskCreationOptions.LongRunning);\r\n            }\r\n\r\n            try\r\n            {\r\n                Task.WaitAny(tasks, cts.Token);\r\n            }\r\n            catch (OperationCanceledException)\r\n            {\r\n            }\r\n\r\n            Task.WaitAll(tasks);\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>(I leave it as an exercise to the reader to eliminate the duplication&#8230;)<\/p>\n<p>These tests when run in Release mode on my 12-core system output these results:<br \/>\n<code><br \/>\nTest Name:\tShouldOnlyAllowOneTaskInFlightWithManyTryingToStart<br \/>\nTest Outcome:\tPassed<br \/>\nResult StandardOutput:<br \/>\nThread 39 ran 370480 iterations and caught 346010 exceptions.<br \/>\nThread 42 ran 370480 iterations and caught 347760 exceptions.<br \/>\nThread 35 ran 370480 iterations and caught 349375 exceptions.<br \/>\nThread 41 ran 370480 iterations and caught 349517 exceptions.<br \/>\nThread 37 ran 370480 iterations and caught 347413 exceptions.<br \/>\nThread 38 ran 370480 iterations and caught 345398 exceptions.<br \/>\nThread 36 ran 370480 iterations and caught 347464 exceptions.<br \/>\nThread 4 ran 370480 iterations and caught 347261 exceptions.<br \/>\nThread 28 ran 370480 iterations and caught 345826 exceptions.<br \/>\nThread 32 ran 370480 iterations and caught 346443 exceptions.<br \/>\nThread 33 ran 370480 iterations and caught 345520 exceptions.<br \/>\nThread 40 ran 370480 iterations and caught 349619 exceptions.<br \/>\nThread 34 ran 370480 iterations and caught 349496 exceptions.<br \/>\nThread 8 ran 370480 iterations and caught 347067 exceptions.<br \/>\nThread 6 ran 370480 iterations and caught 346850 exceptions.<br \/>\nThread 26 ran 370480 iterations and caught 346181 exceptions.<\/p>\n<p>Test Name:\tShouldOnlyAllowOneTaskInFlightWithManyTryingToComplete<br \/>\nTest Outcome:\tPassed<br \/>\nResult StandardOutput:<br \/>\nThread 43 ran 377994 iterations and caught 353909 exceptions.<br \/>\nThread 55 ran 377994 iterations and caught 356977 exceptions.<br \/>\nThread 50 ran 377994 iterations and caught 357073 exceptions.<br \/>\nThread 53 ran 377994 iterations and caught 352969 exceptions.<br \/>\nThread 44 ran 377994 iterations and caught 352972 exceptions.<br \/>\nThread 47 ran 377994 iterations and caught 353356 exceptions.<br \/>\nThread 49 ran 377994 iterations and caught 356587 exceptions.<br \/>\nThread 45 ran 377994 iterations and caught 352553 exceptions.<br \/>\nThread 51 ran 377994 iterations and caught 354122 exceptions.<br \/>\nThread 58 ran 377994 iterations and caught 353903 exceptions.<br \/>\nThread 52 ran 377994 iterations and caught 353760 exceptions.<br \/>\nThread 54 ran 377994 iterations and caught 353365 exceptions.<br \/>\nThread 56 ran 377994 iterations and caught 357139 exceptions.<br \/>\nThread 9 ran 377994 iterations and caught 353156 exceptions.<br \/>\nThread 48 ran 377994 iterations and caught 353143 exceptions.<br \/>\nThread 57 ran 377994 iterations and caught 354926 exceptions.<br \/>\n<\/code><br \/>\nNote that the completion race test is a bit faster than the start race test because of the unconditional heap allocation of TaskCompletionSource in AsyncOperation.Start(). I haven&#8217;t yet come up with a way to avoid this in managed code&#8230; maybe an enterprising reader can help.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Sometimes you want asynchrony but not concurrency. For example, if you are writing data to a file, you should generally prefer asynchronous I\/O, but you probably don&#8217;t want 10 other competing callers to corrupt the contents. Perhaps the simplest way&hellip; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[21,61,51],"tags":[],"class_list":["post-5261","post","type-post","status-publish","format-standard","hentry","category-async","category-concurrency","category-testing"],"_links":{"self":[{"href":"https:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5261","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5261"}],"version-history":[{"count":2,"href":"https:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5261\/revisions"}],"predecessor-version":[{"id":5281,"href":"https:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5261\/revisions\/5281"}],"wp:attachment":[{"href":"https:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5261"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5261"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5261"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}