MemoryChannel and concurrency

Spread the love

In a previous post, I described MemoryChannel and how I used TDD to implement it. After the 25th commit, 20 unit tests later, I had a fully functional but single-threaded implementation.

Simply by inspection, I knew that this code would fail almost immediately if a sender and receiver were ever executed on concurrent threads. For example, Send and ReceiveAsync can potentially modify the excessBuffers LinkedList concurrently which may corrupt its internal state.

I figured that a thread-safe MemoryChannel would be generally more useful, if a bit more complex. Thus I began devising an integration test application, knowing that my unit tests could not be of much help here.

I started by writing a simple skeleton test app with a Receiver class (async receive loop) and a Sender class (background thread send loop). The test app started out like this:

Logger logger = new Logger();
MemoryChannel channel = new MemoryChannel();

Receiver receiver = new Receiver(channel, logger, 16);
Sender sender = new Sender(channel, logger, 16, 1);

Task receiverTask = receiver.RunAsync();
Task senderTask = sender.RunAsync();

Task.WaitAll(receiverTask, senderTask);

channel.Dispose();

logger.WriteLine("Done.");

At this point, I had only implemented the receive loop — the Sender was just a single Send operation running on a background thread. But this was enough to consistently trigger an invalid state:

System.InvalidOperationException: A receive operation is already in progress.
at CommSample.MemoryChannel.ReceiveAsync(Byte[] buffer) in CommSample\source\CommSample.Core\MemoryChannel.cs:line 39
at CommSample.Receiver.<RunAsync>d__0.MoveNext() in CommSample\source\CommSample.App\Receiver.cs:line 34
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)
at System.Threading.Tasks.Task.WaitAll(Task[] tasks)
at CommSample.Program.Main(String[] args) in CommSample\source\CommSample.App\Program.cs:line 24

Using the debugger and setting a breakpoint on the exception condition in MemoryChannel.ReceiveAsync, we can get a more useful stack trace that illustrates the problem:

  • CommSample.Core.dll!CommSample.MemoryChannel.ReceiveAsync(byte[] buffer) Line 36
  • CommSample.App.exe!CommSample.Receiver.RunAsync() Line 34
  • [Resuming Async Method]
  • [ . . . ]
  • mscorlib.dll!System.Threading.Tasks.TaskCompletionSource<int>.SetResult(int result)
  • CommSample.Core.dll!CommSample.MemoryChannel.ReceiveRequest.TryComplete(bool disposing) Line 161
  • CommSample.Core.dll!CommSample.MemoryChannel.Send(byte[] buffer) Line 75
  • CommSample.App.exe!CommSample.Sender.RunInner() Line 41
  • [ . . . ]
  • mscorlib.dll!System.Threading.ThreadHelper.ThreadStart(object obj)

Ah, so it is the synchronous continuation execution triggered by TaskCompletionSource.SetResult that is biting us here! To help explain further, let’s make a small modification to the Receiver loop:

do
{
    this.logger.WriteLine("Before await...");
    try
    {
        bytesRead = await this.channel.ReceiveAsync(buffer);
    }
    catch (Exception e)
    {
        this.logger.WriteLine("Receive threw: {0}", e);
        throw;
    }

    this.logger.WriteLine("After await...");
    totalBytes += bytesRead;
}
while (bytesRead > 0);

Running the app now results in the following output:

[0000.004/T01] Before await...
[0000.008/T01] Sender B=16/F=0x1 starting...
[0000.010/T03] After await...
[0000.010/T03] Before await...
[0000.012/T03] Receive threw: System.InvalidOperationException: A receive operation is already in progress.
at CommSample.MemoryChannel.ReceiveAsync(Byte[] buffer) in CommSample.Core\MemoryChannel.cs:line 39
at CommSample.Receiver.d__0.MoveNext() in CommSample\source\CommSample
.App\Receiver.cs:line 37
[0000.012/T03] Sender B=16/F=0x1 completed. Sent 16 bytes.

As expected, the first await we hit returns control back to the caller with one receive now in progress. Note that the code after the first await and before the next await is scheduled as a continuation at this point. The sender then runs which fulfills the pending receive. This triggers the continuation, which in turn raises an exception since the code to set pendingReceive to null hasn’t had a chance to run yet.

Technically this issue could be induced by a single-threaded unit test with some clever use of ContinueWith. However, the “fix” one would likely make in response would not solve the broad issue of race conditions inherent in the code at this point.

Instead, I forged ahead and reorganized the code to eliminate thread-safety issues. I started by adding locking to the receive logic. I noted that excessBuffers, being a reference type and a read-only private instance field, would work fine as a lock object. I moved all of the logic of this method under this lock, save for the final conditional logic to complete the result (to prevent deadlocks, you should generally avoid running arbitrary user code such as event handlers or in this case, continuations, under a lock).

This was a fine start but it was only the beginning. With locking, everyone must participate in order to ensure thread-safety. To complete the implementation, I added locking to send (with one later fix) and dispose.

At this point, I was ready to complete the real integration test application. The story continues in the next post…

5 thoughts on “MemoryChannel and concurrency

  1. ranyao

    The Race condition arises because we have the continuation task running in parallel with the code after SetResult() right? Could we simply move SetResult to the end of the method call? I mean move “set pendingReceive to null” before SetResult?

    1. Brian Rogers Post author

      Not quite. The continuations in this case are synchronous so they aren’t running in parallel. The continuation logically “gets there first” and sees the still-assigned value. Moving the code around a bit as you described could fix that one race condition but it won’t solve the larger issues I noted (e.g. unsafe multi-threaded access to shared state).

    1. Brian Rogers Post author

      Not exactly. Using ‘await’ wouldn’t guarantee that we’d hit this or any other similar race condition. It’s really about the sequencing of calls — in this case, having a looping sender and receiver sharing threads.

Leave a Reply

Your email address will not be published. Required fields are marked *