Orchestrating race conditions

Many a programmer has struggled with unit tests and those pesky race conditions that are
seemingly
immune to them. Is it even possible to verify concurrency correctness using TDD? I am going to tell you that it is — sometimes. There are two prerequisites: you have to know that a particular race condition is possible, and you have to be able to deterministically expose it.

Awareness of a race can arise in several ways. It could be a thought exercise based on careful analysis of the code. Other times it is through a separate activity such as a multi-threaded load test. No matter how it comes up, a TDD practitioner upon learning of the race will not want to leave reproducibility to chance. But being able to expose such a race in a unit test will obviously require a “hook” to orchestrate a specific execution sequence. Basically, we need what Michael Feathers calls a seam.

There are a few examples of this on the web already. For example, Stephen Vance‘s article Reproducing Race Conditions in Tests shows a Java-based approach using log4j as an injection mechanism. Fortunately in asynchronous .NET code, a method returning a Task already provides a pretty good seam as a starting point, assuming we have some control of the creation of the task.

Consider a simple example of a process that is restarted on demand if it has crashed or exited. Let’s call this ImmortalProcess. The first few behavioral specifications are pretty simple: we need the process to be created on first use, reused on subsequent calls, and recreated on the next use after a fault (notified by an event). The code and associated tests might look something like this:

public class ImmortalProcess
{
    private readonly Func<Task<IProcess>> createProcessAsync;

    private IProcess cachedProcess;

    public ImmortalProcess(Func<Task<IProcess>> createProcessAsync)
    {
        this.createProcessAsync = createProcessAsync;
    }

    public async Task<int> GetIdAsync()
    {
        if (this.cachedProcess == null)
        {
            this.cachedProcess = await this.createProcessAsync();
            this.cachedProcess.Faulted += this.OnProcessFaulted;
        }

        return this.cachedProcess.Id;
    }

    private void OnProcessFaulted(object sender, EventArgs e)
    {
        this.cachedProcess = null;
    }
}

public interface IProcess
{
    int Id { get; }

    event EventHandler Faulted;
}

[TestClass]
public class ImmortalProcessTest
{
    [TestMethod]
    public void ShouldCreateProcessOnFirstCall()
    {
        ImmortalProcess process = new ImmortalProcess(() => Task.FromResult<IProcess>(new ProcessStub() { Id = 0xA }));

        Task<int> firstCall = process.GetIdAsync();

        Assert.IsTrue(firstCall.IsCompleted);
        Assert.AreEqual(0xA, firstCall.Result);
    }

    [TestMethod]
    public void ShouldReuseCreatedProcessOnSecondCall()
    {
        int currentId = 0;
        ImmortalProcess process = new ImmortalProcess(() => Task.FromResult<IProcess>(new ProcessStub() { Id = ++currentId }));

        Task<int> firstCall = process.GetIdAsync();
        Assert.IsTrue(firstCall.IsCompleted);

        Task<int> secondCall = process.GetIdAsync();

        Assert.IsTrue(secondCall.IsCompleted);
        Assert.AreEqual(1, currentId);
        Assert.AreEqual(1, secondCall.Result);
    }

    [TestMethod]
    public void ShouldRecreateProcessOnNextCallAfterFault()
    {
        int currentId = 0;
        ProcessStub currentProcess = null;
        ImmortalProcess process = new ImmortalProcess(() => Task.FromResult<IProcess>(currentProcess = new ProcessStub() { Id = ++currentId }));

        Task<int> firstCall = process.GetIdAsync();
        Assert.IsTrue(firstCall.IsCompleted);

        Assert.IsNotNull(currentProcess);

        currentProcess.RaiseFaulted();

        Task<int> secondCall = process.GetIdAsync();

        Assert.IsTrue(secondCall.IsCompleted);
        Assert.AreEqual(2, currentId);
        Assert.AreEqual(2, secondCall.Result);
    }

    private sealed class ProcessStub : IProcess
    {
        public int Id { get; set; }

        public event EventHandler Faulted;

        public void RaiseFaulted()
        {
            this.Faulted(null, null);
        }
    }
}

This code is not thread-safe whatsoever. So let’s introduce a new requirement — the process creation must be done once only by the first caller. By inspection we know the current code won’t meet this condition, so let’s write the test to expose the issue. We can take advantage of the natural seam provided by the createProcessAsync delegate to control the lifetime of all returned Tasks:

[TestMethod]
public void ShouldCreateProcessOnceOnlyByFirstConcurrentCaller()
{
    TaskCompletionSource<IProcess> currentTask = null;
    Func<Task<IProcess>> createProcessAsync = delegate
    {
        currentTask = new TaskCompletionSource<IProcess>();
        return currentTask.Task;
    };

    ImmortalProcess process = new ImmortalProcess(createProcessAsync);

    Task<int> firstCall = process.GetIdAsync();
    Assert.IsFalse(firstCall.IsCompleted);
    Assert.IsNotNull(currentTask);

    TaskCompletionSource<IProcess> firstTask = currentTask;
    currentTask = null;

    Task<int> secondCall = process.GetIdAsync();
    Assert.IsFalse(secondCall.IsCompleted);
    Assert.IsNull(currentTask); // FAILS

    firstTask.SetResult(new ProcessStub() { Id = 0x12ACE });

    Assert.IsTrue(firstCall.IsCompleted);
    Assert.AreEqual(0x12ACE, firstCall.Result);

    Assert.IsTrue(secondCall.IsCompleted);
    Assert.AreEqual(0x12ACE, secondCall.Result);
}

With the code as written, the assertion marked // FAILS will not pass because both callers are incorrectly allowed to create the process. Given that the process creation method is async, we will fix this by introducing an asynchronous exclusive lock (see “Building an async exclusive lock” for an implementation).

public class ImmortalProcess
{
    private readonly ExclusiveLock exclusiveLock;
    // ...
    public ImmortalProcess(Func<Task<IProcess>> createProcessAsync)
    {
        // ...
        this.exclusiveLock = new ExclusiveLock();
    }

    public async Task<int> GetIdAsync()
    {
        if (this.cachedProcess == null)
        {
            ExclusiveLock.Token token = await this.exclusiveLock.AcquireAsync();
            try
            {
                if (this.cachedProcess == null)
                {
                    this.cachedProcess = await this.createProcessAsync();
                    this.cachedProcess.Faulted += this.OnProcessFaulted;
                }
            }
            finally
            {
                this.exclusiveLock.Release(token);
            }
        }

        return this.cachedProcess.Id;
    }
    // ...
}

First race condition down! (Note: For brevity, I’ve omitted two tests that would show the lock is released properly on synchronous and asynchronous exceptions from createChannelAsync.)

There is still a very subtle race condition here, though. Imagine that the Faulted event is raised sometime between creation and first use of the process. Since we set the cached process to null on fault, we would probably see a NullReferenceException! Instead, what we’d rather do in this case is use the initially created process; the call would likely still fail but with a more sensible error (e.g. the process has exited/is not available). But now we’re faced with a problem: how can we cause the fault happen deterministically after creation and event subscription, but before the first call we make?

The only code between these two events right now is a call to release the lock, which might be an interesting place for a seam. So let’s introduce a new locking pattern via a LockAsync delegate to return an IDisposable representing the lock scope. By default we’ll wire it up to ExclusiveLock, but via property injection the caller can redirect it to a custom implementation:

public class ImmortalProcess
{
    // ...
    public ImmortalProcess(Func<Task<IProcess>> createProcessAsync)
    {
        // ...
        this.LockAsync = this.AcquireAsync;
    }

    public Func<Task<IDisposable>> LockAsync { get; set; }

    public async Task<int> GetIdAsync()
    {
        if (this.cachedProcess == null)
        {
            using (await this.LockAsync())
            {
                if (this.cachedProcess == null)
                {
                    this.cachedProcess = await this.createProcessAsync();
                    this.cachedProcess.Faulted += this.OnProcessFaulted;
                }
            }
        }

        return this.cachedProcess.Id;
    }
    // ...
    private async Task<IDisposable> AcquireAsync()
    {
        ExclusiveLock.Token token = await this.exclusiveLock.AcquireAsync();
        return new TokenWrapper(this.exclusiveLock, token);
    }

    private sealed class TokenWrapper : IDisposable
    {
        private readonly ExclusiveLock parent;
        private readonly ExclusiveLock.Token token;

        public TokenWrapper(ExclusiveLock parent, ExclusiveLock.Token token)
        {
            this.parent = parent;
            this.token = token;
        }

        public void Dispose()
        {
            this.parent.Release(this.token);
        }
    }
}

Now we can write the failing test:

[TestMethod]
public void ShouldUseInitiallyCreatedProcessIfFaultedBeforeEndOfFirstCall()
{
    int count = 0;
    ProcessStub currentProcess = null;
    ImmortalProcess process = new ImmortalProcess(() => Task.FromResult<IProcess>(currentProcess = new ProcessStub() { Id = ++count }));
    TaskCompletionSource<IDisposable> tcs = new TaskCompletionSource<IDisposable>();
    process.LockAsync = () => tcs.Task;
    DisposableStub disposable = new DisposableStub() { OnDisposing = () => currentProcess.RaiseFaulted() };

    Task<int> firstCall = process.GetIdAsync();
    Assert.IsFalse(firstCall.IsCompleted);

    tcs.SetResult(disposable);

    Assert.IsTrue(firstCall.IsCompleted);
    Assert.AreEqual(1, firstCall.Result);
}

As expected, it will fail when getting Result at the end because the task will be faulted due to a null reference. Now for the fix which uses a local variable that will remain unaffected no matter what happens to the cachedProcess reference:

public async Task<int> GetIdAsync()
{
    IProcess current = this.cachedProcess;
    if (current == null)
    {
        using (await this.LockAsync())
        {
            current = this.cachedProcess;
            if (current == null)
            {
                current = await this.createProcessAsync();
                current.Faulted += this.OnProcessFaulted;
                this.cachedProcess = current;
            }
        }
    }

    return current.Id;
}

Race condition two vanquished! Are we done yet? Sadly, no, as there is one more extremely subtle race. What if the process is faulted just before we subscribe to the Faulted event? No matter how hard we try we could miss this event, so we need to do a last-chance check before we decide to publish the value. Failing to do this would result in a “perma-faulted” process object which we would never recreate.

Here is the failing test, which requires a new IsFaulted property:

[TestMethod]
public void ShouldRecreateProcessOnNextCallIfFaultedBeforeSubscribed()
{
    int count = 0;
    ImmortalProcess process = new ImmortalProcess(() => Task.FromResult<IProcess>(new ProcessStub() { IsFaulted = true, Id = ++count }));

    Task<int> firstCall = process.GetIdAsync();
    Assert.IsTrue(firstCall.IsCompleted);

    Task<int> secondCall = process.GetIdAsync();
    Assert.IsTrue(secondCall.IsCompleted);
    Assert.AreEqual(2, secondCall.Result);
}

It will fail as the count will forever be stuck at 1. In the fix, we push the IsFaulted property definition to the IProcess interface and make use of it in ImmortalProcess:

public async Task<int> GetIdAsync()
{
    IProcess current = this.cachedProcess;
    if (current == null)
    {
        using (await this.LockAsync())
        {
            current = this.cachedProcess;
            if (current == null)
            {
                current = await this.createProcessAsync();
                current.Faulted += this.OnProcessFaulted;
                if (!current.IsFaulted)
                {
                    this.cachedProcess = current;
                }
            }
        }
    }

    return current.Id;
}

Say goodbye to race condition three. But in a cruel twist of fate, we have introduced new race condition four. Consider the case where the Faulted event is raised just as we are checking the IsFaulted flag. The cachedProcess would be set to null by the event handler and then we would set it right back again to the now-faulted current process. To reproduce this race, we need to use the IsFaulted property as a seam:

private sealed class ProcessStub : IProcess
{
    private bool isFaulted;
    // ...
    public Action OnGetIsFaulted { get; set; }

    public bool IsFaulted
    {
        get
        {
            if (this.OnGetIsFaulted != null)
            {
                this.OnGetIsFaulted();
            }

            return this.isFaulted;
        }

        set
        {
            this.isFaulted = value;
        }
    }
    //...
}

Now the unit test can inject a call to RaiseFaulted() in the getter:

[TestMethod]
public void ShouldRecreateProcessOnNextCallIfFaultedAfterSubscribed()
{
    int count = 0;
    Func<Task<IProcess>> createProcessAsync = delegate
    {
        ProcessStub currentProcess = new ProcessStub() { Id = ++count };
        currentProcess.OnGetIsFaulted = () => currentProcess.RaiseFaulted();
        return Task.FromResult<IProcess>(currentProcess);
    };

    ImmortalProcess process = new ImmortalProcess(createProcessAsync);

    Task<int> firstCall = process.GetIdAsync();
    Assert.IsTrue(firstCall.IsCompleted);

    Task<int> secondCall = process.GetIdAsync();
    Assert.IsTrue(secondCall.IsCompleted);
    Assert.AreEqual(2, secondCall.Result);
}

This test fails as we had expected with count stuck at 1. To fix this, we need to do sort of the inverse of the current code. We will publish the value to cachedProcess as early as possible and set it to null if it happens to end up faulted. This makes the operation essentially idempotent — no harm will be done if we race with the event handler and set to null again:

public async Task<int> GetIdAsync()
{
    IProcess current = this.cachedProcess;
    if (current == null)
    {
        using (await this.LockAsync())
        {
            current = this.cachedProcess;
            if (current == null)
            {
                current = await this.createProcessAsync();
                this.cachedProcess = current;

                current.Faulted += this.OnProcessFaulted;
                if (current.IsFaulted)
                {
                    this.cachedProcess = null;
                }
            }
        }
    }

    return current.Id;
}

That concludes the removal of the fourth and (I think) final harmful race condition. In practice, it is difficult to be certain that your code will operate correctly in all possible concurrency situations. Armed with a few design strategies and cooperative test doubles, however, your async code can be shown “correct by example” in a straightforward way.

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.