You may have heard of System.IO.Pipelines, a set of classes to aid in writing high performance streaming server code. But what about datagram scenarios, e.g. using UDP? That would seem like a job for System.Threading.Channels. Let’s try it and do some high performance “datagramming”!
Our scenario today will be a simple datagram client and server, transmitting small string messages of the form “Message nnn“. We will use the netcoreapp3.1 target framework to take advantage of the improved stability and performance of .NET Core 3.x.
Let’s start with the entry point (which uses the new-ish async main and using declaration features):
using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; internal sealed class Program { private static async Task Main(string[] args) { using CancellationTokenSource cts = new CancellationTokenSource(); List<Task> tasks = new List<Task>(); const ushort Port = 9999; MessageCount count = new MessageCount(); string arg = (args.Length == 1) ? args[0].ToUpperInvariant() : string.Empty; switch (arg) { default: Console.WriteLine("Please specify 'client' or 'server'."); return; case "CLIENT": tasks.AddRange(DatagramClient.Start(count, Port, cts.Token)); break; case "SERVER": tasks.AddRange(DatagramServer.Start(count, Port, cts.Token)); break; } tasks.Add(TaskEx.RunAsync(nameof(LogAsync), count, cts.Token, (c, t) => LogAsync(c, t))); tasks.Add(Task.Run(() => Prompt())); await Task.WhenAny(tasks); Console.WriteLine("Canceling..."); cts.Cancel(); await Task.WhenAll(tasks); } private static void Prompt() { Console.WriteLine("Press ENTER to quit."); Console.ReadLine(); } private static async Task LogAsync(MessageCount count, CancellationToken token) { Stopwatch stopwatch = Stopwatch.StartNew(); while (!token.IsCancellationRequested) { await Task.Delay(1000, token); Console.WriteLine($"{stopwatch.ElapsedMilliseconds},{count.Value}"); } } }
We’re setting up the basic framework here, defining the client and server as a sequence of background tasks. We have the ability for the user to press ENTER to cancel everything as well as a periodic logging task to track the message count. The supporting class TaskEx
helps standardize the error handling and cancellation patterns:
using System; using System.Threading; using System.Threading.Tasks; internal static class TaskEx { public static async Task RunAsync<T>(string name, T input, CancellationToken token, Func<T, CancellationToken, Task> doAsync) { Console.WriteLine($"{name} starting..."); try { await doAsync(input, token); } catch (OperationCanceledException) { Console.WriteLine($"{name} canceled."); } catch (Exception e) { Console.WriteLine($"{name} failed: {e}"); } } }
And of course MessageCount
is just a Java-style reference type wrapper:
internal sealed class MessageCount { public long Value { get; private set; } public void Increment() => this.Value++; }
The client just sends UDP messages in a tight loop, using Task.Yield to prevent blocking the thread forever (also note the gratuitous use of the value tuple in the SendAsync
method):
using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; internal static class DatagramClient { public static IEnumerable<Task> Start(MessageCount count, ushort port, CancellationToken token) { yield return TaskEx.RunAsync(nameof(SendAsync), (count, port), token, (c, t) => SendAsync(c, t)); } private static async Task SendAsync((MessageCount, ushort) x, CancellationToken token) { await Task.Yield(); MessageCount count = x.Item1; ushort port = x.Item2; using UdpClient client = new UdpClient(); client.Connect(IPAddress.Loopback, port); while (!token.IsCancellationRequested) { count.Increment(); byte[] dgram = Encoding.UTF8.GetBytes("Message " + count.Value); client.Send(dgram, dgram.Length); } } }
The boilerplate is done. We can move on to the main event — a datagram server which, as promised, uses a Channel<string> as a producer/consumer queue:
using System; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; internal static class DatagramServer { public static IEnumerable<Task> Start(MessageCount count, ushort port, CancellationToken token) { Channel<string> channel = Channel.CreateBounded<string>(64); yield return TaskEx.RunAsync(nameof(ConsumeAsync), (count, channel.Reader), token, (c, t) => ConsumeAsync(c, t)); yield return TaskEx.RunAsync(nameof(ProduceAsync), (port, channel.Writer), token, (x, t) => ProduceAsync(x, t)); } private static async Task ProduceAsync((ushort, ChannelWriter<string>) x, CancellationToken token) { ushort port = x.Item1; ChannelWriter<string> writer = x.Item2; using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); socket.Bind(new IPEndPoint(IPAddress.Loopback, port)); while (!token.IsCancellationRequested) { byte[] buffer = new byte[64]; int size = await socket.ReceiveAsync(new Memory<byte>(buffer), SocketFlags.None, token); string message = Encoding.UTF8.GetString(buffer, 0, size); while (!writer.TryWrite(message)) { await writer.WaitToWriteAsync(token); } } } private static async Task ConsumeAsync((MessageCount, ChannelReader<string>) x, CancellationToken token) { MessageCount count = x.Item1; ChannelReader<string> reader = x.Item2; while (!token.IsCancellationRequested) { string message; while (!reader.TryRead(out message)) { await reader.WaitToReadAsync(token); } count.Increment(); int checksum = Fletcher32(message); if (checksum == 0x12345678) { Console.WriteLine("!"); } } } private static int Fletcher32(string data) { int sum1 = 0; int sum2 = 0; foreach (char c in data) { sum1 = (sum1 + c) % 0xFFFF; sum2 = (sum2 + sum1) % 0xFFFF; } return (sum2 << 16) | sum1; } }
Note that we start the consumer before the producer to potentially avoid long chains of synchronous continuations on the socket receive side; an initial Task.Yield would have also been fine. Although there are a few dozen lines here, there is nothing too exotic. The producer side reads from a socket (using the new ValueTask-based ReceiveAsync overload) and writes the parsed string to the channel. The consumer reads the string from the channel and, in an effort to do “real work” with the result, calculates Fletcher’s checksum for the data.
This is a pretty standard channel use case so one would hope the performance is decent. Indeed (at least on my machine) I see more than acceptable throughput — nearly 137K messages per second:
However, if we stop and think about this for a minute, it is not clear we need a channel at all. This is a single socket with a single producer and a single consumer. Isn’t the socket itself already the “queue”? What we have done seems to be no more than double buffering (and not the good kind). Could we perhaps run even faster if we ditch the channel entirely? Let’s find out:
public static IEnumerable<Task> Start(MessageCount count, ushort port, CancellationToken token) { // Use a single background task to receive from the socket yield return TaskEx.RunAsync(nameof(ReceiveAsync), (count, port), token, (x, t) => ReceiveAsync(x, t)); } // Do all the work directly in here private static async Task ReceiveAsync((MessageCount, ushort) x, CancellationToken token) { MessageCount count = x.Item1; ushort port = x.Item2; using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); socket.Bind(new IPEndPoint(IPAddress.Loopback, port)); while (!token.IsCancellationRequested) { byte[] buffer = new byte[64]; int size = await socket.ReceiveAsync(new Memory<byte>(buffer), SocketFlags.None, token); string message = Encoding.UTF8.GetString(buffer, 0, size); count.Increment(); int checksum = Fletcher32(message); if (checksum == 0x12345678) { Console.WriteLine("!"); } } }
The code is simpler. Will the lower complexity be rewarded with higher performance?
The results look positive. We’re seeing a rate around 146K/sec now, nearly 7% faster than before — admittedly not huge but certainly big enough to be meaningful.
So what is the conclusion? Should we not use channels? To paraphrase Alan Page, you should use channels in 100% of the cases that can benefit from their use. For example, if you needed to ensure single threaded consumption in an inherently multithreaded pipeline, channels would be your best bet.
As it turned out, this particular socket problem simply had another more efficient option with a different set of tradeoffs. But we aren’t done yet — check back next time to see how we can get additional easy (and other not-so-easy) performance gains.