{"id":5662,"date":"2019-12-27T15:00:53","date_gmt":"2019-12-27T15:00:53","guid":{"rendered":"http:\/\/writeasync.net\/?p=5662"},"modified":"2019-12-26T16:46:36","modified_gmt":"2019-12-26T16:46:36","slug":"high-performance-datagramming-channels","status":"publish","type":"post","link":"http:\/\/writeasync.net\/?p=5662","title":{"rendered":"High performance datagramming: channels?"},"content":{"rendered":"<p>You may have heard of <a href=\"https:\/\/devblogs.microsoft.com\/dotnet\/system-io-pipelines-high-performance-io-in-net\/\">System.IO.Pipelines<\/a>, a set of classes to aid in writing high performance streaming server code. But what about <a href=\"https:\/\/en.wikipedia.org\/wiki\/Datagram\">datagram<\/a> scenarios, e.g. using <a href=\"https:\/\/en.wikipedia.org\/wiki\/User_Datagram_Protocol\">UDP<\/a>? That would seem like a job for <a href=\"https:\/\/devblogs.microsoft.com\/dotnet\/an-introduction-to-system-threading-channels\/\">System.Threading.Channels<\/a>. Let&#8217;s try it and do some high performance &#8220;datagramming&#8221;!<\/p>\n<p>Our scenario today will be a simple datagram client and server, transmitting small string messages of the form &#8220;Message <em>nnn<\/em>&#8220;. We will use the netcoreapp3.1 <a href=\"https:\/\/docs.microsoft.com\/en-us\/dotnet\/standard\/frameworks#supported-target-framework-versions\">target framework<\/a> to take advantage of the <a href=\"https:\/\/devblogs.microsoft.com\/dotnet\/announcing-net-core-3-1\/\">improved stability and performance of .NET Core 3.x<\/a>.<\/p>\n<p>Let&#8217;s start with the entry point (which uses the new-ish <a href=\"https:\/\/docs.microsoft.com\/en-us\/dotnet\/csharp\/whats-new\/csharp-7-1#async-main\">async main<\/a> and <a href=\"https:\/\/www.telerik.com\/blogs\/c-8-static-local-functions-and-using-declarations\">using declaration<\/a> features):<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing System;\r\nusing System.Collections.Generic;\r\nusing System.Diagnostics;\r\nusing System.Threading;\r\nusing System.Threading.Tasks;\r\n\r\ninternal sealed class Program\r\n{\r\n    private static async Task Main(string&#x5B;] args)\r\n    {\r\n        using CancellationTokenSource cts = new CancellationTokenSource();\r\n        List&lt;Task&gt; tasks = new List&lt;Task&gt;();\r\n        const ushort Port = 9999;\r\n        MessageCount count = new MessageCount();\r\n        string arg = (args.Length == 1) ? args&#x5B;0].ToUpperInvariant() : string.Empty;\r\n        switch (arg)\r\n        {\r\n            default:\r\n                Console.WriteLine(&quot;Please specify 'client' or 'server'.&quot;);\r\n                return;\r\n            case &quot;CLIENT&quot;:\r\n                tasks.AddRange(DatagramClient.Start(count, Port, cts.Token));\r\n                break;\r\n            case &quot;SERVER&quot;:\r\n                tasks.AddRange(DatagramServer.Start(count, Port, cts.Token));\r\n                break;\r\n        }\r\n\r\n        tasks.Add(TaskEx.RunAsync(nameof(LogAsync), count, cts.Token, (c, t) =&gt; LogAsync(c, t)));\r\n        tasks.Add(Task.Run(() =&gt; Prompt()));\r\n\r\n        await Task.WhenAny(tasks);\r\n\r\n        Console.WriteLine(&quot;Canceling...&quot;);\r\n        cts.Cancel();\r\n\r\n        await Task.WhenAll(tasks);\r\n    }\r\n\r\n    private static void Prompt()\r\n    {\r\n        Console.WriteLine(&quot;Press ENTER to quit.&quot;);\r\n        Console.ReadLine();\r\n    }\r\n\r\n    private static async Task LogAsync(MessageCount count, CancellationToken token)\r\n    {\r\n        Stopwatch stopwatch = Stopwatch.StartNew();\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            await Task.Delay(1000, token);\r\n            Console.WriteLine($&quot;{stopwatch.ElapsedMilliseconds},{count.Value}&quot;);\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>We&#8217;re setting up the basic framework here, defining the client and server as a sequence of background tasks. We have the ability for the user to press ENTER to cancel everything as well as a periodic logging task to track the message count. The supporting class <code>TaskEx<\/code> helps standardize the error handling and cancellation patterns:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing System;\r\nusing System.Threading;\r\nusing System.Threading.Tasks;\r\n\r\ninternal static class TaskEx\r\n{\r\n    public static async Task RunAsync&lt;T&gt;(string name, T input, CancellationToken token, Func&lt;T, CancellationToken, Task&gt; doAsync)\r\n    {\r\n        Console.WriteLine($&quot;{name} starting...&quot;);\r\n        try\r\n        {\r\n            await doAsync(input, token);\r\n        }\r\n        catch (OperationCanceledException)\r\n        {\r\n            Console.WriteLine($&quot;{name} canceled.&quot;);\r\n        }\r\n        catch (Exception e)\r\n        {\r\n            Console.WriteLine($&quot;{name} failed: {e}&quot;);\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>And of course <code>MessageCount<\/code> is just a <a href=\"https:\/\/stackoverflow.com\/questions\/20804862\/wrapper-classes-and-call-by-reference-in-java\/20804991#20804991\">Java-style reference type wrapper<\/a>:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\ninternal sealed class MessageCount\r\n{\r\n    public long Value { get; private set; }\r\n\r\n    public void Increment() =&gt; this.Value++;\r\n}\r\n<\/pre>\n<p>The client just sends UDP messages in a tight loop, using Task.Yield to prevent blocking the thread forever (also note the gratuitous use of the <a href=\"https:\/\/blogs.msdn.microsoft.com\/mazhou\/2017\/05\/26\/c-7-series-part-1-value-tuples\/\">value tuple<\/a> in the <code>SendAsync<\/code> method):<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing System.Collections.Generic;\r\nusing System.Net;\r\nusing System.Net.Sockets;\r\nusing System.Text;\r\nusing System.Threading;\r\nusing System.Threading.Tasks;\r\n\r\ninternal static class DatagramClient\r\n{\r\n    public static IEnumerable&lt;Task&gt; Start(MessageCount count, ushort port, CancellationToken token)\r\n    {\r\n        yield return TaskEx.RunAsync(nameof(SendAsync), (count, port), token, (c, t) =&gt; SendAsync(c, t));\r\n    }\r\n\r\n    private static async Task SendAsync((MessageCount, ushort) x, CancellationToken token)\r\n    {\r\n        await Task.Yield();\r\n\r\n        MessageCount count = x.Item1;\r\n        ushort port = x.Item2;\r\n        using UdpClient client = new UdpClient();\r\n        client.Connect(IPAddress.Loopback, port);\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            count.Increment();\r\n            byte&#x5B;] dgram = Encoding.UTF8.GetBytes(&quot;Message &quot; + count.Value);\r\n            client.Send(dgram, dgram.Length);\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>The boilerplate is done. We can move on to the main event &#8212; a datagram server which, as promised, uses a <a href=\"https:\/\/docs.microsoft.com\/en-us\/dotnet\/api\/system.threading.channels.channel-1?view=netcore-3.1\">Channel&lt;string&gt;<\/a> as a producer\/consumer queue:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing System;\r\nusing System.Collections.Generic;\r\nusing System.Net;\r\nusing System.Net.Sockets;\r\nusing System.Text;\r\nusing System.Threading;\r\nusing System.Threading.Channels;\r\nusing System.Threading.Tasks;\r\n\r\ninternal static class DatagramServer\r\n{\r\n    public static IEnumerable&lt;Task&gt; Start(MessageCount count, ushort port, CancellationToken token)\r\n    {\r\n        Channel&lt;string&gt; channel = Channel.CreateBounded&lt;string&gt;(64);\r\n        yield return TaskEx.RunAsync(nameof(ConsumeAsync), (count, channel.Reader), token, (c, t) =&gt; ConsumeAsync(c, t));\r\n        yield return TaskEx.RunAsync(nameof(ProduceAsync), (port, channel.Writer), token, (x, t) =&gt; ProduceAsync(x, t));\r\n    }\r\n\r\n    private static async Task ProduceAsync((ushort, ChannelWriter&lt;string&gt;) x, CancellationToken token)\r\n    {\r\n        ushort port = x.Item1;\r\n        ChannelWriter&lt;string&gt; writer = x.Item2;\r\n        using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);\r\n        socket.Bind(new IPEndPoint(IPAddress.Loopback, port));\r\n\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            byte&#x5B;] buffer = new byte&#x5B;64];\r\n            int size = await socket.ReceiveAsync(new Memory&lt;byte&gt;(buffer), SocketFlags.None, token);\r\n            string message = Encoding.UTF8.GetString(buffer, 0, size);\r\n            while (!writer.TryWrite(message))\r\n            {\r\n                await writer.WaitToWriteAsync(token);\r\n            }\r\n        }\r\n    }\r\n\r\n    private static async Task ConsumeAsync((MessageCount, ChannelReader&lt;string&gt;) x, CancellationToken token)\r\n    {\r\n        MessageCount count = x.Item1;\r\n        ChannelReader&lt;string&gt; reader = x.Item2;\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            string message;\r\n            while (!reader.TryRead(out message))\r\n            {\r\n                await reader.WaitToReadAsync(token);\r\n            }\r\n\r\n            count.Increment();\r\n            int checksum = Fletcher32(message);\r\n            if (checksum == 0x12345678)\r\n            {\r\n                Console.WriteLine(&quot;!&quot;);\r\n            }\r\n        }\r\n    }\r\n\r\n    private static int Fletcher32(string data)\r\n    {\r\n        int sum1 = 0;\r\n        int sum2 = 0;\r\n        foreach (char c in data)\r\n        {\r\n            sum1 = (sum1 + c) % 0xFFFF;\r\n            sum2 = (sum2 + sum1) % 0xFFFF;\r\n        }\r\n\r\n        return (sum2 &lt;&lt; 16) | sum1;\r\n    }\r\n}\r\n<\/pre>\n<p>Note that we start the consumer before the producer to potentially avoid long chains of synchronous continuations on the socket receive side; an initial Task.Yield would have also been fine. Although there are a few dozen lines here, there is nothing too exotic. The producer side reads from a socket (using <a href=\"https:\/\/devblogs.microsoft.com\/dotnet\/understanding-the-whys-whats-and-whens-of-valuetask\/\">the new ValueTask<\/a>-based <a href=\"https:\/\/docs.microsoft.com\/en-us\/dotnet\/api\/system.net.sockets.sockettaskextensions.receiveasync?view=netcore-3.1#System_Net_Sockets_SocketTaskExtensions_ReceiveAsync_System_Net_Sockets_Socket_System_Memory_System_Byte__System_Net_Sockets_SocketFlags_System_Threading_CancellationToken_\">ReceiveAsync overload<\/a>) and writes the parsed string to the channel. The consumer reads the string from the channel and, in an effort to do &#8220;real work&#8221; with the result, calculates <a href=\"https:\/\/en.wikipedia.org\/wiki\/Fletcher%27s_checksum\">Fletcher&#8217;s checksum<\/a> for the data.<\/p>\n<p>This is a pretty standard channel use case so one would hope the performance is decent. Indeed (at least on my machine) I see more than acceptable throughput &#8212; nearly 137K messages per second:<br \/>\n<a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-with-channel.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-with-channel-300x180.png\" alt=\"Datagram with channel (~137K\/sec)\" width=\"300\" height=\"180\" class=\"alignnone size-medium wp-image-5665\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-with-channel-300x180.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-with-channel.png 481w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p>However, if we stop and think about this for a minute, it is not clear we need a channel at all. This is a single socket with a single producer and a single consumer. Isn&#8217;t the socket itself already the &#8220;queue&#8221;? What we have done seems to be no more than double buffering (and not <a href=\"https:\/\/wiki.osdev.org\/Double_Buffering\">the good kind<\/a>). Could we perhaps run even faster if we ditch the channel entirely? Let&#8217;s find out:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\n    public static IEnumerable&lt;Task&gt; Start(MessageCount count, ushort port, CancellationToken token)\r\n    {\r\n        \/\/ Use a single background task to receive from the socket\r\n        yield return TaskEx.RunAsync(nameof(ReceiveAsync), (count, port), token, (x, t) =&gt; ReceiveAsync(x, t));\r\n    }\r\n\r\n    \/\/ Do all the work directly in here\r\n    private static async Task ReceiveAsync((MessageCount, ushort) x, CancellationToken token)\r\n    {\r\n        MessageCount count = x.Item1;\r\n        ushort port = x.Item2;\r\n        using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);\r\n        socket.Bind(new IPEndPoint(IPAddress.Loopback, port));\r\n\r\n        while (!token.IsCancellationRequested)\r\n        {\r\n            byte&#x5B;] buffer = new byte&#x5B;64];\r\n            int size = await socket.ReceiveAsync(new Memory&lt;byte&gt;(buffer), SocketFlags.None, token);\r\n            string message = Encoding.UTF8.GetString(buffer, 0, size);\r\n            count.Increment();\r\n            int checksum = Fletcher32(message);\r\n            if (checksum == 0x12345678)\r\n            {\r\n                Console.WriteLine(&quot;!&quot;);\r\n            }\r\n        }\r\n    }\r\n<\/pre>\n<p>The code is simpler. Will the lower complexity be rewarded with higher performance?<br \/>\n<a href=\"http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-without-channel.png\"><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-without-channel-300x180.png\" alt=\"Datagram without channel (~146K\/sec)\" width=\"300\" height=\"180\" class=\"alignnone size-medium wp-image-5666\" srcset=\"http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-without-channel-300x180.png 300w, http:\/\/writeasync.net\/wp-content\/uploads\/2019\/12\/Datagram-without-channel.png 481w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p>The results look positive. We&#8217;re seeing a rate around 146K\/sec now, nearly 7% faster than before &#8212; admittedly not <em>huge<\/em> but certainly big enough to be meaningful.<\/p>\n<p>So what is the conclusion? Should we not use channels? To paraphrase <a href=\"https:\/\/sqa.stackexchange.com\/a\/135\">Alan Page<\/a>, you should use channels in 100% of the cases that can benefit from their use. For example, if you needed to ensure single threaded consumption in an inherently multithreaded pipeline, channels would be your best bet.<\/p>\n<p>As it turned out, this particular socket problem simply had another more efficient option with a different set of tradeoffs. But we aren&#8217;t done yet &#8212; check back next time to see how we can get additional easy (and other not-so-easy) performance gains.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>You may have heard of System.IO.Pipelines, a set of classes to aid in writing high performance streaming server code. But what about datagram scenarios, e.g. using UDP? That would seem like a job for System.Threading.Channels. Let&#8217;s try it and do&hellip; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[21,104],"tags":[],"class_list":["post-5662","post","type-post","status-publish","format-standard","hentry","category-async","category-performance"],"_links":{"self":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5662","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5662"}],"version-history":[{"count":3,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5662\/revisions"}],"predecessor-version":[{"id":5667,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/5662\/revisions\/5667"}],"wp:attachment":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5662"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5662"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5662"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}