Introducing overlapped I/O

Spread the love

In Windows, I/O operations such as ReadFile can be performed synchronously or asynchronously. Asynchronous I/O is generally referred to as overlapped I/O since multiple operations can be issued at once and “overlap” in their request lifetimes. There are a few techniques for dealing with overlapped I/O, but they basically boil down to using either alertable I/O or I/O completion ports.

Alertable I/O relies on asynchronous procedure calls and is more limited — in particular, the completion callback always runs on the same thread as the original request and cannot be dispatched until the thread is in “alertable state” (which generally involves calling a blocking wait function like WaitForSingleObjectEx). This really isn’t compatible with the continuation-passing style of asynchronous programming which is implemented by the Parallel Patterns Library in C++ and the Task Parallel Library in .NET.

So that leaves I/O completion ports as the mechanism of choice for native async, and indeed this is the mechanism used under the hood by .NET APIs like FileStream.ReadAsync (assuming of course that FileStream.IsAsync is true). Unfortunately, direct manipulation of completion ports is rather complicated (although Kenny Kerr‘s illuminating blog post “I/O Completion Ports” helps a bit). Luckily there is a (relatively) simpler way with the new thread pool API available since Windows Vista. Using CreateThreadPoolIo, StartThreadPoolIo, etc. an application can use worker threads in the thread pool to efficiently handle I/O completion callbacks.

Let’s walk through an example set of C++ classes for handling asynchronous file reading. (I should note that in the middle of writing this example I saw Artur Laksberg’s blog post “Simplifying Overlapped I/O With PPL” which covers very similar ground, though there is hopefully enough “value-add” in this post to justify its existence.) First, a few core includes, namespaces, and classes:

#include <Windows.h>
#include <string>
#include <iostream>
#include <sstream>
#include <vector>
#include <stdexcept>
#include <memory>
#include <functional>
#include <ppltasks.h>
#include "ppltasks_extra.h"

using namespace concurrency;
using namespace concurrency::extras;
using namespace std;

// Utility function to trace a message with a thread ID
void TraceThread(wstring const & text)
{
    DWORD id = GetCurrentThreadId();
    wstringstream wss;
    wss << L"[T=" << id << L"] " << text << endl;
    wcout << wss.str();
}

// Base class to mark a class as non-copyable
struct DenyCopy
{
    DenyCopy() { }
    ~DenyCopy() { }
    DenyCopy & operator =(DenyCopy const &) = delete;
    DenyCopy(DenyCopy const &) = delete;
};

// Exception created from generic Win32 error code.
class Win32Error : public runtime_error
{
public:
    Win32Error(string const & message, DWORD error)
        : runtime_error(message),
        error_(error)
    {
    }

    ~Win32Error() { }

    DWORD get_Error() const { return error_; }

private:
    DWORD error_;
};

// IO-specific exception associated with a given file.
class IOError : public Win32Error
{
public:
    IOError(string const & message, DWORD error, wstring const & fileName)
        : Win32Error(message, error),
        fileName_(fileName)
    {
    }

    ~IOError() { }

    wstring const & get_FileName() const { return fileName_; }

private:
    wstring fileName_;
};

Now, we need a file handle wrapper that will open a file for reading and prepare it for overlapped I/O. We’ll need use CreateFile (with OPEN_EXISTING) and pass FILE_FLAG_OVERLAPPED otherwise all async I/O attempts will fail:

class FileReadHandle : private DenyCopy
{
public:
    FileReadHandle(wstring const & fileName)
        : fileName_(fileName),
        handle_(CreateFile(fileName.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, nullptr))
    {
        if (handle_ == INVALID_HANDLE_VALUE)
        {
            DWORD error = GetLastError();
            throw IOError("Open failed.", error, fileName);
        }
    }

    ~FileReadHandle()
    {
        CloseHandle(handle_);
    }

    wstring const & get_FileName() const { return fileName_; }

    HANDLE get_Value() { return handle_; }

private:
    wstring fileName_;
    HANDLE handle_;
};

Now we need to add some code for manipulating I/O worker threads. The expected workflow involves multiple steps as follows:

  • Before issuing any I/O requests, CreateThreadPoolIo must be called to associate a callback for the specified handle.
  • Before each I/O request, StartThreadpoolIo must be called.
  • If the I/O request fails, CancelThreadPoolIo must be called to explicitly clean up resources associated with the request.
  • After the final I/O request, CloseThreadPoolIo must be called to clean up thread pool resources. To ensure there are no pending requests still active, WaitForThreadpoolIoCallbacks can be used.

These steps are easy to get wrong, so it is safest to build an RAII pattern with cleanup happening in destructors. There are two classes that work together to manage this, ThreadPoolIO which represents the overall thread pool resources with an associated callback and PendingIO which manages a single pending call. Calling Start signals the start of an async request and returns a PendingIO object. The OnStarted method should be called if the async request is started successfully, otherwise, the I/O operation is safely canceled in the destructor:

class PendingIO : private DenyCopy
{
public:
    PendingIO(PTP_IO io)
        : io_(io)
    {
    }

    ~PendingIO()
    {
        if (io_)
        {
            CancelThreadpoolIo(io_);
        }
    }

    void OnStarted()
    {
        io_ = nullptr;
    }

private:
    PTP_IO io_;
};

template <typename TCallback>
class ThreadPoolIO : private DenyCopy
{
public:
    ThreadPoolIO(HANDLE handle, TCallback callback)
        : io_(),
        callback_(callback)
    {
        io_ = CreateThreadpoolIo(handle, IOCompletedAdapter, this, nullptr);
        if (!io_)
        {
            DWORD error = GetLastError();
            throw Win32Error("CreateThreadPoolIo failed.", error);
        }
    }

    ~ThreadPoolIO()
    {
        WaitForThreadpoolIoCallbacks(io_, FALSE);
        CloseThreadpoolIo(io_);
    }

    PendingIO Start()
    {
        StartThreadpoolIo(io_);
        return PendingIO(io_);
    }

private:
    PTP_IO io_;
    TCallback callback_;

    static void WINAPI IOCompletedAdapter(
        PTP_CALLBACK_INSTANCE instance,
        PVOID context,
        PVOID overlapped,
        ULONG result,
        ULONG_PTR bytesTransferred,
        PTP_IO io)
    {
        static_cast<ThreadPoolIO *>(context)->OnIOCompleted(static_cast<OVERLAPPED *>(overlapped), result, bytesTransferred);
    }

    void OnIOCompleted(OVERLAPPED * overlapped, ULONG result, ULONG_PTR bytesTransferred)
    {
        callback_(overlapped, result, bytesTransferred);
    }
};

Note the use of an adapter method (I’ve sometimes seen this referred to as a “thunk”) for the I/O completion callback. In here we cast the generic context pointer to our this pointer we originally passed and then use it to invoke the member function which in turn executes the real callback we want.

Now for the final piece, the FileReader which does all the actual async I/O work. The interface is very simple — just a ReadAsync method which accepts a buffer (C++-style, using a BYTE vector) and a query method EndOfFile to tell when we’ve reached the end. To construct the reader, we need to pass a FileReadHandle. Internally, this is used to create an ThreadPoolIO instance to which we also pass another callback adapter. The real work happens in the inner ReadRequest class which is an enhanced/augmented OVERLAPPED structure. This enables our callback adapter (which receives the original OVERLAPPED pointer) to unpack the original ReadRequest and call an appropriate member function.

class FileReader : private DenyCopy
{
public:
    FileReader(FileReadHandle & handle)
        : handle_(handle),
        io_(handle.get_Value(), IOCompletedAdapter),
        offset_(0),
        endOfFile_(false)
    {
    }

    ~FileReader()
    {
    }

    bool EndOfFile() const { return endOfFile_; }

    task<int> ReadAsync(vector<BYTE> & buffer)
    {
        unique_ptr<ReadRequest> request = make_unique<ReadRequest>(*this);
        BOOL result = ReadFile(handle_.get_Value(), &buffer[0], static_cast<DWORD>(buffer.size()), nullptr, request.get());
        if (!result)
        {
            request->OnStartError();
        }

        return request.release()->OnStarted();
    }

private:
    class ReadRequest;

    FileReadHandle & handle_;
    ThreadPoolIO<function<void(OVERLAPPED *, ULONG, ULONG_PTR)>> io_;
    unsigned long long offset_;
    bool endOfFile_;

    static void WINAPI IOCompletedAdapter(OVERLAPPED * overlapped, ULONG result, ULONG_PTR bytesTransferred)
    {
        unique_ptr<ReadRequest> request(static_cast<ReadRequest *>(overlapped));
        request->OnCompleted(static_cast<DWORD>(result), static_cast<int>(bytesTransferred));
    }

    class ReadRequest : public OVERLAPPED
    {
    public:
        ReadRequest(FileReader & parent)
            : OVERLAPPED({ 0 }),
            parent_(parent),
            pendingIO_(parent.io_.Start()),
            taskEvent_()
        {
            Offset = parent_.offset_ & 0xFFFFFFFF;
            OffsetHigh = parent_.offset_ >> 32;
        }

        ~ReadRequest()
        {
        }

        void OnStartError()
        {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING)
            {
                throw IOError("Read failed.", error, parent_.handle_.get_FileName());
            }
        }

        task<int> OnStarted()
        {
            pendingIO_.OnStarted();
            return task<int>(taskEvent_);
        }

        void OnCompleted(DWORD error, int bytesRead)
        {
            if (error == ERROR_HANDLE_EOF)
            {
                parent_.endOfFile_ = true;
                error = ERROR_SUCCESS;
            }

            if (error == ERROR_SUCCESS)
            {
                parent_.offset_ += bytesRead;
                taskEvent_.set(bytesRead);
            }
            else
            {
                taskEvent_.set_exception(make_exception_ptr(IOError("Read failed.", error, parent_.handle_.get_FileName())));
            }
        }

    private:
        FileReader & parent_;
        PendingIO pendingIO_;
        task_completion_event<int> taskEvent_;
    };
};

Note the memory management here, achieved by careful use of unique_ptr. The ReadAsync constructs the ReadRequest in the free store, calls ReadFile and then checks the result. If it is not success, we ask the request to process the error. Note that for async requests, we almost always expect an “error” with ERROR_IO_PENDING, so the request code handles this case by simply returning — otherwise, we throw.

In the exception case, the request is safely cleaned up by the unique_ptr destructor. However, in the success case, we release() our interest in the object as we call OnStarted; this is critical to avoid either early or double deletion, since the async request is now in flight and will complete on (and should thus be cleaned up by) another thread. If OnStarted is ultimately called, it will call OnStarted on the PendingIO object (otherwise, on destruction it will be canceled).

When the completion callback arrives (IOCompletedAdapter), we immediately wrap the pointer back into a unique_ptr and jump back into the request. This ensures that we will always clean up the resource at the end of the call. In the OnCompleted method, we simply detect success or failure from the return value. On success, we can advance the read offset for the next call and mark the task as completed (note that we also consider EOF as success). On failure we throw an IOError back to the caller via the task.

Sample code using create_iterative_task to implement an async read loop:

template<typename TCallback>
task<void> ReadLoopAsync(FileReader & reader, int bufferSize, TCallback callback)
{
    TraceThread(L"starting read loop");
    return create_iterative_task([&reader, bufferSize, callback]
    {
        shared_ptr<vector<BYTE>> buffer = make_shared<vector<BYTE>>(bufferSize);
        TraceThread(L"starting read");
        task<int> readTask = reader.ReadAsync(*buffer);
        return readTask.then([buffer, &reader, callback](task<int> t)
        {
            int bytesRead = t.get();
            TraceThread(L"completing read request");
            callback(*buffer, bytesRead);
            bool shouldContinue = (bytesRead > 0) && !reader.EndOfFile();
            return task_from_result(shouldContinue);
        });
    });
}

And finally, an app that reads a file asynchronously in 64-byte chunks, printing the contents to the screen as ASCII characters:

void TraceBufferRead(vector<BYTE> & buffer, int bytesRead)
{
    for (int i = 0; i < bytesRead; ++i)
    {
        cout << static_cast<char>(buffer[i]);
    }

    cout << endl;
}

void ReadSample(wstring const & fileName)
{
    FileReadHandle handle(fileName);
    FileReader reader(handle);
    task<void> task = ReadLoopAsync(reader, 64, TraceBufferRead);
    task.wait();
}

A snippet of the output using a sample file with contents of ‘1234567890…’ (and some newlines) repeated a bunch of times:

[T=12676] starting read loop
[T=18444] starting read
[T=18444] completing read request
1234567890123456789012345678901234567890123456789012345678901234
[T=22240] starting read
[T=18444] completing read request
5678901234567890
1234567890123456789012345678901234567890123456
[T=22240] starting read
[T=18444] completing read request
7890123456789012345678901234567890
1234567890123456789012345678
[T=22240] starting read
[T=18444] completing read request
9012345678901234567890123456789012345678901234567890
1234567890
[T=22240] starting read
[T=18444] completing read request
1234567890123456789012345678901234567890123456789012345678901234
 . . . 

Hopefully this gives you a better idea of how async works at the core. The rules for overlapped I/O are rather complex but modern C++ design can help ease some of the burden and create safe, reusable patterns.

2 thoughts on “Introducing overlapped I/O

  1. Pingback: Managed interop for native overlapped | WriteAsync .NET

  2. Pingback: Async or non-blocking? – WriteAsync .NET

Leave a Reply

Your email address will not be published. Required fields are marked *