{"id":1571,"date":"2014-01-31T13:00:37","date_gmt":"2014-01-31T13:00:37","guid":{"rendered":"http:\/\/writeasync.net\/?p=1571"},"modified":"2014-01-29T08:17:32","modified_gmt":"2014-01-29T08:17:32","slug":"inputqueue-use-cases","status":"publish","type":"post","link":"http:\/\/writeasync.net\/?p=1571","title":{"rendered":"InputQueue use cases"},"content":{"rendered":"<p>In my <a href=\"http:\/\/writeasync.net\/?p=1511\">previous post<\/a>, I introduced <code>InputQueue<\/code>. Now for some (hopefully) realistic examples of how you can use it.<\/p>\n<p>Perhaps the most obvious scenario is a command processor with an async dispatch loop, for example:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic interface ICommand\r\n{\r\n    Task ExecuteAsync();\r\n}\r\n\r\npublic sealed class CommandProcessor : IDisposable\r\n{\r\n    private readonly InputQueue&lt;ICommand&gt; queue;\r\n    private readonly Task task;\r\n\r\n    public CommandProcessor()\r\n    {\r\n        this.queue = new InputQueue&lt;ICommand&gt;();\r\n        this.task = this.DispatchLoopAsync();\r\n    }\r\n\r\n    public void Dispose()\r\n    {\r\n        this.queue.Dispose();\r\n    }\r\n\r\n    public void Enqueue(ICommand command)\r\n    {\r\n        this.queue.Enqueue(command);\r\n    }\r\n\r\n    private async Task DispatchLoopAsync()\r\n    {\r\n        try\r\n        {\r\n            while (true)\r\n            {\r\n                ICommand command = await this.queue.DequeueAsync();\r\n                await command.ExecuteAsync();\r\n            }\r\n        }\r\n        catch (ObjectDisposedException)\r\n        {\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>This processor allows a single command to be executed at once, in order of arrival in the queue. Best of all, it doesn&#8217;t burn a thread, allowing you to have many instances running while still maintaining scalability and efficiency. Note that <strong>there is no error handling in this example<\/strong>, so treat this as demo code only!<\/p>\n<p>Another common use for queuing is in <a href=\"http:\/\/en.wikipedia.org\/wiki\/Throttling_process_(computing)\">resource throttling<\/a>. This sample code implements a <a href=\"http:\/\/en.wikipedia.org\/wiki\/Pool_(computer_science)\">resource pool<\/a>:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic sealed class Pool&lt;T&gt; : IDisposable\r\n{\r\n    private readonly InputQueue&lt;T&gt; queue;\r\n\r\n    public Pool(IEnumerable&lt;T&gt; items)\r\n    {\r\n        this.queue = new InputQueue&lt;T&gt;();\r\n        foreach (T item in items)\r\n        {\r\n            this.queue.Enqueue(item);\r\n        }\r\n    }\r\n\r\n    public void Dispose()\r\n    {\r\n        this.queue.Dispose();\r\n    }\r\n\r\n    public async Task&lt;Handle&gt; TakeAsync()\r\n    {\r\n        T item = await this.queue.DequeueAsync();\r\n        return new Handle(item, this.queue);\r\n    }\r\n\r\n    public struct Handle : IDisposable\r\n    {\r\n        private readonly T item;\r\n\r\n        private IProducerQueue&lt;T&gt; queue;\r\n\r\n        public Handle(T item, IProducerQueue&lt;T&gt; queue)\r\n        {\r\n            this.item = item;\r\n            this.queue = queue;\r\n        }\r\n\r\n        public T Item\r\n        {\r\n            get { return this.item; }\r\n        }\r\n\r\n        public void Dispose()\r\n        {\r\n            IProducerQueue&lt;T&gt; localQueue = this.queue;\r\n            if (localQueue != null)\r\n            {\r\n                this.queue = null;\r\n                localQueue.Enqueue(this.item);\r\n            }\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>The number of items in the pool controls the allowed concurrency level. If the pool becomes empty, the requestor must wait (asynchronously) until a previous item is returned. Returning an item is achieved by disposing the <code>Handle<\/code> struct that the pool gives to the caller. As an example, consider a fixed set of clients and a number of concurrent asynchronous send loops:<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\npublic interface IClient\r\n{\r\n    Task SendAsync(string data);\r\n}\r\n\r\n\r\nprivate static async Task RunSendersAsync(int senderCount, CancellationToken token)\r\n{\r\n    IEnumerable&lt;IClient&gt; clients = await CreateClientsAsync();\r\n    Pool&lt;IClient&gt; pool = new Pool&lt;IClient&gt;(clients);\r\n\r\n    Task&#x5B;] tasks = new Task&#x5B;senderCount];\r\n    for (int i = 0; i &lt; senderCount; ++i)\r\n    {\r\n        tasks&#x5B;i] = SendLoopAsync(pool, token);\r\n    }\r\n\r\n    await Task.WhenAll(tasks);\r\n}\r\n\r\nprivate static async Task SendLoopAsync(Pool&lt;IClient&gt; pool, CancellationToken token)\r\n{\r\n    while (!token.IsCancellationRequested)\r\n    {\r\n        await Task.Delay(100);\r\n        using (Pool&lt;IClient&gt;.Handle handle = await pool.TakeAsync())\r\n        {\r\n            await handle.Item.SendAsync(&quot;hello&quot;);\r\n        }\r\n    }\r\n}\r\n<\/pre>\n<p>There are certainly countless other ways you could leverage an <code>InputQueue<\/code>. Try it out!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In my previous post, I introduced InputQueue. Now for some (hopefully) realistic examples of how you can use it. Perhaps the most obvious scenario is a command processor with an async dispatch loop, for example: public interface ICommand { Task&hellip; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[21],"tags":[],"class_list":["post-1571","post","type-post","status-publish","format-standard","hentry","category-async"],"_links":{"self":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/1571","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1571"}],"version-history":[{"count":0,"href":"http:\/\/writeasync.net\/index.php?rest_route=\/wp\/v2\/posts\/1571\/revisions"}],"wp:attachment":[{"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1571"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1571"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/writeasync.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1571"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}