Async fixed concurrency workflows

In the previous post, I mentioned async fixed concurrency workflows as providing the best balance between resource utilization, latency, and throughput. Let’s explore two ways to build such a workflow.

Single-threaded workflow

The first design we will look at involves a single-threaded workflow. The idea is that we want many calls in flight at a given moment but we only want to schedule and handle completion on a single thread. The advantage is that we don’t have to do any locking while preparing each call, although this means we will never launch more than one call at a time.

We’ll start with an OperationManager class and the minimal interface:

public class OperationManager
{
    public OperationManager(int maxPendingCalls, Func<Task> doAsync)
    {
    }

    public Task RunAsync(int totalCalls)
    {
        throw new NotImplementedException();
    }
}

After a few basic unit tests (Max of one allows only one call at a time, Max of one with call count 3 allows only one call at a time for 3 iterations), we get to the point where we need to handle multiple calls in flight concurrently. How will we proceed?

The basic idea is that we want to use a semaphore to limit the concurrency level to our max pending call count. Before launching the call, we should wait for the semaphore. After the call has completed, it should release the semaphore. Of course, we need an async semaphore in order to avoid blocking. SemaphoreSlim happens to have WaitAsync methods, but in this implementation I chose to go with Stephen Toub’s AsyncSemaphore instead; it is more async TDD-friendly since it runs continuations synchronously instead of on the thread pool like SemaphoreSlim.

Multi-threaded workflow

Implementing a multi-threaded workflow is much simpler. In this case, you only need to launch a series of async loops, up to the max pending call count. Calls will be launched and completed concurrently in this case, so be aware of any shared state that you are manipulating and add proper locking where necessary.

To see how all this code evolved, you can get the full commit history here: History for writeasync/projects/ConcurrentSample

2 thoughts on “Async fixed concurrency workflows

  1. ranyao

    One question regarding capturing exceptions from the completed tasks to fast fail:

    The async semaphore limit the concurrency level so that once one call complete and release the semaphore, the next call can enter immediately. However, HandleCompletedCalls seems to prevent the next call from entering since it will complete only all pending tasks are done (blocking) since it requires (pending.Count = 0) to exit. If one of the pending task completed while another did not, the next call seems to me not be able to enter. Could we just just loop through all pending tasks and check once after each await asyncSemaphore.WaitAsync() call?

    private static void HandleCompletedCalls(Queue pending, IList exceptions)
    {
    while (pending.Count > 0)
    {
    Task task = pending.Peek();
    if (task.IsCompleted)
    {
    pending.Dequeue();
    if (task.Exception != null)
    {
    exceptions.Add(task.Exception);
    }
    }
    else
    {
    return;
    }
    }
    }

    1. Brian Rogers Post author

      The handle completion code is not responsible for “completing” the tasks. They have already completed (and hence already exited the semaphore). We are just checking the outcome of the task.

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload the CAPTCHA.