We’ve been looking at some tips and tricks for achieving high performance in a skeletal datagram server. And yet, the implementation so far is single threaded. This is fine if strict packet ordering must be maintained. But we’re talking about datagram protocols here — lack of ordering is a feature!
Normally it is unwise to make assumptions about multithreading support for a given .NET class. However, we are in luck — the .NET Socket class is documented to be thread safe. So we should have no qualms issuing multiple pending asynchronous receives. Things are murkier for the threading requirements of UdpClient — to be safe, we’ll just create one instance per worker.
First, we have a new entry point that takes an optional workers
argument to control concurrency:
private static async Task Main(string[] args) { using CancellationTokenSource cts = new CancellationTokenSource(); List<Task> tasks = new List<Task>(); const ushort Port = 9999; MessageCount count = new MessageCount(); string arg = (args.Length > 0) ? args[0].ToUpperInvariant() : string.Empty; int workers = 1; if ((args.Length > 1) && !int.TryParse(args[1], out workers)) { workers = 1; } switch (arg) { default: Console.WriteLine("Please specify 'client' or 'server'."); return; case "CLIENT": tasks.AddRange(DatagramClient.Start(count, Port, workers, cts.Token)); break; case "SERVER": tasks.AddRange(DatagramServer.Start(count, Port, workers, cts.Token)); break; } tasks.Add(TaskEx.RunAsync(nameof(LogAsync), count, cts.Token, (c, t) => LogAsync(c, t))); tasks.Add(Task.Run(() => Prompt())); await Task.WhenAny(tasks); Console.WriteLine("Canceling..."); cts.Cancel(); await Task.WhenAll(tasks); }
Given the concurrent nature of the messaging, we have to make MessageCount
thread safe as well:
internal sealed class MessageCount { private long value; public long Value => Thread.VolatileRead(ref this.value); public long Increment() => Interlocked.Increment(ref this.value); }
Then we have the new client which can send from many UdpClient
s in parallel:
internal static class DatagramClient { public static IEnumerable<Task> Start(MessageCount count, ushort port, int workers, CancellationToken token) { for (int i = 0; i < workers; ++i) { yield return TaskEx.RunAsync(nameof(SendAsync) + "_" + i, (count, port), token, (c, t) => SendAsync(c, t)); } } private static async Task SendAsync((MessageCount, ushort) x, CancellationToken token) { await Task.Yield(); MessageCount count = x.Item1; ushort port = x.Item2; using UdpClient client = new UdpClient(); client.Connect(IPAddress.Loopback, port); byte[] dgram = Encoding.UTF8.GetBytes("Message X"); int last = dgram.Length - 1; while (!token.IsCancellationRequested) { dgram[last] = (byte)('0' + (count.Increment() % 10)); client.Send(dgram, last + 1); } } }
We learned our lesson from last time about allocating so many strings. In this simple example, to avoid writing a fully general itoa method, we’re now using a constant sized buffer where only the last digit changes on each send.
Now that the server is multithreaded, we need to figure out how to handle disposal of the shared socket instance. This should do the trick:
internal static class DatagramServer { public static IEnumerable<Task> Start(MessageCount count, ushort port, int workers, CancellationToken token) { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); socket.Bind(new IPEndPoint(IPAddress.Loopback, port)); for (int i = 0; i < workers; ++i) { yield return TaskEx.RunAsync(nameof(ReceiveAsync) + "_" + i, (count, socket), token, (x, t) => ReceiveAsync(x, t)); } yield return TaskEx.RunAsync(nameof(DisposeOnCancelAsync), socket, token, (s, t) => DisposeOnCancelAsync(s, t)); } private static async Task DisposeOnCancelAsync(Socket socket, CancellationToken token) { using (socket) { await Task.Delay(-1, token); } } private static async Task ReceiveAsync((MessageCount, Socket) x, CancellationToken token) { await Task.Yield(); MessageCount count = x.Item1; Socket socket = x.Item2; byte[] bytes = new byte[64]; Memory<byte> buffer = new Memory<byte>(bytes); char[] chars = new char[64]; while (!token.IsCancellationRequested) { int size = await socket.ReceiveAsync(buffer, SocketFlags.None, token); int length = Encoding.UTF8.GetChars(bytes, 0, size, chars, 0); count.Increment(); int checksum = Fletcher32(new ReadOnlySpan<char>(chars, 0, length)); if (checksum == 0x12345678) { Console.WriteLine("!"); } } } private static int Fletcher32(ReadOnlySpan<char> data) { int sum1 = 0; int sum2 = 0; foreach (char c in data) { sum1 = (sum1 + c) % 0xFFFF; sum2 = (sum2 + sum1) % 0xFFFF; } return (sum2 << 16) | sum1; } }
Also note the newly added Task.Yield. In practice, it seems that the higher message volume achieved with more client-side workers occasionally results in unending chains of synchronously completing async operations on the server side.
These results show the total server throughput given equal numbers of client and server workers:
We see a steady increase in throughput as workers increase, with four workers emerging as the winner in this contest (~370K/sec). Interestingly, the eight worker case does slightly worse (~354K/sec) probably due in part to the fact that the eight clients are relentlessly spamming the server. In a real world scenario, we would probably want to keep trying different combinations to find the very best result for clients and servers. But this is already a very contrived example — there aren’t many situations where we would have a local client blasting messages as fast as possible alongside a receiver on the same machine.
Not only will this implementation get better raw throughput, it can help mitigate the impact of a stuck worker. Of course, we are not intentionally blocking here, but depending on what the receiver does with the message after it is read from the socket, we could experience long delays or run into programming errors like unhandled exceptions. This potential problem highlights what is perhaps the most powerful benefit of actually using channels. In the original channel example I showed, the receiver logic was only concerned with receives; beyond that, it simply pushed the received data to a channel for another decoupled processing component to worry about. It could only ever get blocked if the channel itself was blocked or if the receive operation went wrong (e.g. broken socket, though this is less likely in a connectionless protocol).
Adding processing logic in the receive path admittedly increases the risk of unintended consequences interfering with the flow of data. Then again, there are different error cases to consider in the decoupled approach, such as what do when a bounded channel runs out of capacity (now you have a flow control problem). As always, there is no true answer that works for all contexts — just different tradeoffs to consider.
Pingback: Let’s do DHCP: intro – WriteAsync .NET
Pingback: Let’s do DHCP: sockets – WriteAsync .NET