InputQueue use cases

Spread the love

In my previous post, I introduced InputQueue. Now for some (hopefully) realistic examples of how you can use it.

Perhaps the most obvious scenario is a command processor with an async dispatch loop, for example:

public interface ICommand
{
    Task ExecuteAsync();
}

public sealed class CommandProcessor : IDisposable
{
    private readonly InputQueue<ICommand> queue;
    private readonly Task task;

    public CommandProcessor()
    {
        this.queue = new InputQueue<ICommand>();
        this.task = this.DispatchLoopAsync();
    }

    public void Dispose()
    {
        this.queue.Dispose();
    }

    public void Enqueue(ICommand command)
    {
        this.queue.Enqueue(command);
    }

    private async Task DispatchLoopAsync()
    {
        try
        {
            while (true)
            {
                ICommand command = await this.queue.DequeueAsync();
                await command.ExecuteAsync();
            }
        }
        catch (ObjectDisposedException)
        {
        }
    }
}

This processor allows a single command to be executed at once, in order of arrival in the queue. Best of all, it doesn’t burn a thread, allowing you to have many instances running while still maintaining scalability and efficiency. Note that there is no error handling in this example, so treat this as demo code only!

Another common use for queuing is in resource throttling. This sample code implements a resource pool:

public sealed class Pool<T> : IDisposable
{
    private readonly InputQueue<T> queue;

    public Pool(IEnumerable<T> items)
    {
        this.queue = new InputQueue<T>();
        foreach (T item in items)
        {
            this.queue.Enqueue(item);
        }
    }

    public void Dispose()
    {
        this.queue.Dispose();
    }

    public async Task<Handle> TakeAsync()
    {
        T item = await this.queue.DequeueAsync();
        return new Handle(item, this.queue);
    }

    public struct Handle : IDisposable
    {
        private readonly T item;

        private IProducerQueue<T> queue;

        public Handle(T item, IProducerQueue<T> queue)
        {
            this.item = item;
            this.queue = queue;
        }

        public T Item
        {
            get { return this.item; }
        }

        public void Dispose()
        {
            IProducerQueue<T> localQueue = this.queue;
            if (localQueue != null)
            {
                this.queue = null;
                localQueue.Enqueue(this.item);
            }
        }
    }
}

The number of items in the pool controls the allowed concurrency level. If the pool becomes empty, the requestor must wait (asynchronously) until a previous item is returned. Returning an item is achieved by disposing the Handle struct that the pool gives to the caller. As an example, consider a fixed set of clients and a number of concurrent asynchronous send loops:

public interface IClient
{
    Task SendAsync(string data);
}


private static async Task RunSendersAsync(int senderCount, CancellationToken token)
{
    IEnumerable<IClient> clients = await CreateClientsAsync();
    Pool<IClient> pool = new Pool<IClient>(clients);

    Task[] tasks = new Task[senderCount];
    for (int i = 0; i < senderCount; ++i)
    {
        tasks[i] = SendLoopAsync(pool, token);
    }

    await Task.WhenAll(tasks);
}

private static async Task SendLoopAsync(Pool<IClient> pool, CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        await Task.Delay(100);
        using (Pool<IClient>.Handle handle = await pool.TakeAsync())
        {
            await handle.Item.SendAsync("hello");
        }
    }
}

There are certainly countless other ways you could leverage an InputQueue. Try it out!

Leave a Reply

Your email address will not be published. Required fields are marked *