Native InputQueue thread-safety

In the previous post, I introduced my port of InputQueue to C++. As usual, the unit tests drove the creation of a correct single-threaded implementation but slightly more needed to be done to make the code thread-safe.

The go-to construct for providing a cross-thread mutual exclusion mechanism in Windows is the critical section. Perhaps C programmers would be satisfied with InitializeCriticalSection, EnterCriticalSection, and LeaveCriticalSection but this is the modern era! For exception-safe, less error-prone code, we should be using RAII techniques. Conveniently, the PPL provides just the thing — a wrapper class critical_section and its associated critical_section::scoped_lock. This will do nicely as a C++ analog to System.Threading.Monitor and the lock statement. Here’s an example of it in action, in the InputQueue::Enqueue method:

void Enqueue(T item)
{
    std::unique_ptr<concurrency::task_completion_event<T>> current;
    {
        concurrency::critical_section::scoped_lock lock(syncRoot_);
        if (!pending_)
        {
            items_.push(item);
        }
        else
        {
            current = std::move(pending_);
        }
    }

    if (current)
    {
        current->set(item);
    }
}

Note the use of scoping braces to ensure that the lock is held only as long as necessary and then released (upon destruction) before attempting to set the task completion event.

To gain confidence that the implementation was truly thread-safe, I ported the integration test app from the managed project to native code. The app is supposed to continuously, concurrently, and asynchronously loop, enqueuing and dequeuing items from the queue. Since we don’t have async/await in C++, achieving this is relatively complicated. Thanks to a bit of help from the PPL team, however, the job is made much easier. Using create_iterative_task, writing an asynchronous loop is no longer a chore. Compare the two implementations of the enqueue loop, first in C# and then in C++, and note the similarities:

private static async Task EnqueueLoopAsync(IProducerQueue<int> queue, CancellationToken token)
{
    await Task.Yield();

    int i = 0;
    while (!token.IsCancellationRequested)
    {
        ++i;
        queue.Enqueue(i);
        await Task.Delay(1);
    }
}
template<typename TProducerQueue>
concurrency::task<void> EnqueueLoopAsync(TProducerQueue & queue, concurrency::cancellation_token token)
{
    return concurrency::extras::create_delayed_task(std::chrono::milliseconds(1), [&queue, token]
    {
        shared_ptr<int> i = std::make_shared<int>();
        return concurrency::extras::create_iterative_task([&queue, token, i]()
        {
            if (token.is_canceled())
            {
                return concurrency::task_from_result(false);
            }

            int next = ++*i;
            queue.Enqueue(next);
            return concurrency::extras::create_delayed_task(std::chrono::milliseconds(1), []() { return true; });
        });
    });
}

With C++, you have to be more explicit about how the memory is managed, using shared_ptr and passing the queue by reference. But structurally the code is similar. To “break” out of the loop, note that we need to return a task with a false result. In this app, create_delayed_task does double duty as an initial “yield” mechanism and between every loop iteration as an async rate-limiting delay.

Overall, the PPL is a great tool to have if you want to write correct and efficient asynchronous native code.

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.