среда, 18 апреля 2012 г.

Асинхронно в WinAPI. Продолжение

Нет ничего сложного в том, чтобы отправить потоку асинхронное сообщение длинной в пару байт. Гораздо интереснее проделать этот же трюк с неограниченным объёмом информации. Итак, цели прежние: организовать асинхронное взаимодействие между потоками с минимальным количеством блокировок. Лишь немного изменились условия: теперь сообщение – это строка произвольной длинны.

Для начала нужно понять, чем же новые условия усложняют реализацию. На первый взгляд может показаться, что нет никакой разницы: просто вместо числа мы должны передавать функции QueueUserAPC адрес отправляемой строки. Так? – Нет! Задумаемся на секунду над вопросом: кто и когда будет выделять и освобождать память для хранения передаваемых данных? Для успешной реализации нужно решить сразу несколько проблем, которые никак напрямую не связаны с параллельным программированием и асинхронным взаимодействием. Рассмотрим каждую из них.

Три проблемы
Первая проблема, с которой мы сталкиваемся – это выделение памяти. Совершенно очевидно, что мы не можем передать адрес переменной из локального контекста: как только мы выйдем за границы видимости, переменная будет уничтожена и память будет освобождена. Следовательно, прежде чем передать потоку адрес строки с помощью функции QueueUserAPC, мы должны динамически выделить буфер памяти и скопировать туда передаваемые данные целиком. Адрес именно этого буфера и отправится в виде асинхронного сообщения.
Вторая проблема напрямую следует из первой. Раз память была выделена – её нужно освободить. Вопрос лишь в том, кто и когда это сделает. Самое прямое и простое решение – это поручить освобождение памяти принимающему потоку. Оно же и самое не правильное. Принимающий поток может быть уничтожен извне или аварийно завершиться при обработке предыдущего сообщения. В этом случае все сообщения, посланные функцией QueueUserAPC, теряются. Вместе с ними теряются и адреса выделенных блоков памяти, что приводит к невозможности их освобождения. Есть и ещё одна причина, о которой я расскажу в самом конце.

Наиболее правильное решение выглядит примерно так:
  1. Передающая сторона сохраняет адрес каждой отправленной строки в динамическом списке вместе с флагом, являющимся признаком получения сообщения.
  2. Адрес структуры, сохранённой в списке и содержащей адрес строки и флаг, передаётся функции QueueUserAPC.
  3. Поток, получивший сообщение, копирует строку в свой собственный буфер для дальнейшей обработки и тут же изменяет значение флага, сигнализируя противоположной стороне о том, что сообщение получено.
  4. Передающая сторона периодически обходит список отправленных сообщений (например, при отправке очередного) и, сверяясь с флагом, очищает те, которые были успешно приняты.
  5. Передающая сторона может использовать список переданных сообщений для корректной очистки памяти при завершении приложения или для повторной отправки сообщений при аварийном завершении одного или нескольких потоков-обработчиков.
Обращаться к флагу потоки будут с помощью Interlocked-функций – это даст корректную и независимую от особенностей конкретного компилятора реализацию конкурентного доступа без излишних блокировок.

Ну и последняя проблема – чисто техническая. Логично поместить реализацию обработки сообщений прямо в теле принимающего потока, а функцию, отвечающую непосредственно за их "получение" реализовать как обработчик APC. В этом случае мы убиваем сразу 2-ух зайцев:
  • передающая сторона будет избавлена от необходимости выбора значения 1-ого параметра при вызове функции QueueUserAPC;
  • решена проблема вложенного вызова обработчиков APC, которая возникает, если текущий обработчик APC косвенно или открыто выполнит тревожное ожидание – в этом случае может произойти вызов следующего обработчика, ожидающего в очереди, что бывает фатально для не подготовленного к таким трюкам кода.
Есть лишь одно НО: обработчик APC лишён возможности передать что-либо в тело функции потока, в рамках которого он был вызван. Решить эту проблему и возвратить полученное сообщение в тело потока нам поможет Thread Local Storage. Все полученные сообщения будут складываться в единую очередь, расположенную в TLS. Поскольку каждый поток будет иметь свою копию очереди в своей TLS, нам не нужно беспокоиться о конкурентном доступе и плодить блокировки.

А теперь слайды
Сначала приведу некоторые глобальные определения и код функций producer и consumer. Они практически ничем не отличаются от аналогичных функций из предыдущей заметки. Все отличия сводятся лишь к типу сообщения (в данном случае – это просто std::string) и небольшому коду, инициализирующему очередь сообщений в TLS:
const DWORD ON_EXIT = 0xFFFFFFFF;

struct ControlEvents {
    HANDLE onExit;
    HANDLE onStart;
};

struct Message {
    std::string message;
    volatile LONG notHandled;
};

// declaration of variable in TLS
// this code is specific for MS VS
__declspec(thread)
volatile std::queue<std::string> *messagesQueuePtrTLS = 0;

unsigned
 int __stdcall
consumer(void *data)
{
    ControlEvents events = *reinterpret_cast<ControlEvents*>(data);

    // define messages queue
    volatile std::queue<std::string> messagesQueue;
    // and store pointer to the messages queue in TLS
    messagesQueuePtrTLS = &messagesQueue;

    std::cout << "[0x"
        << std::hex
        << std::setw(8)
        << std::setfill('0')
        << ::GetCurrentThreadId()
        << "] Consumer." << std::endl;
    ::SetEvent(events.onStart);

    DWORD result = ERROR_SUCCESS;
    std::string message;
    while (ERROR_SUCCESS == (result = receiveMessage(
            events.onExit, messagesQueue, message))) {
        std::cout
            << "[0x"
            << std::hex
            << std::setw(8)
            << std::setfill('0')
            << ::GetCurrentThreadId()
            << "] Message: " << message << std::endl;
    }

    messagesQueuePtrTLS = 0;
    return result == ON_EXIT ? ERROR_SUCCESS : result;
}


DWORD
producer()
{
    typedef std::vector<std::string> TWorkMessages;
    TWorkMessages works;
    works.reserve(12);
    works.push_back("stand up");
    works.push_back("dress");
    works.push_back("cook");
    works.push_back("eat");
    works.push_back("dish up");
    works.push_back("wash");
    works.push_back("close the door");
    works.push_back("drive");
    works.push_back("work");
    works.push_back("work");
    works.push_back("work");
    works.push_back("work");

    ControlEvents events = {0};
    HANDLE workerThreadH = 0;
    DWORD ret = ERROR_SUCCESS;

    try
    {
        events.onExit = ::CreateEvent(0, true, false, 0);
        if (0 == events.onExit) {
            throw ::GetLastError();
        }

        events.onStart = ::CreateEvent(0, true, false, 0);
        if (0 == events.onStart) {
            throw ::GetLastError();
        }

        std::cout << "[0x"
            << std::hex
            << std::setw(8)
            << std::setfill('0')
            << ::GetCurrentThreadId()
            << "] Producer." << std::endl;

        workerThreadH = reinterpret_cast<HANDLE>(
            _beginthreadex(0, 0, &consumer, &events, 0, 0));

        if (0 == workerThreadH) {
            throw ::GetLastError();
        }

        if (WAIT_OBJECT_0 != ::WaitForSingleObject(
            events.onStart, INFINITE)) {
            throw ::GetLastError();
        }

        std::list<Message> sentMessages;
        for(TWorkMessages::const_iterator it = works.begin();
            works.end() != it; ++it)
        {
            DWORD res = sendMessage(
                workerThreadH, *it, sentMessages);
            if (ERROR_SUCCESS != res) {
                throw res;
            }
        }

        // wait a bit
        ::Sleep(1000);
    } catch (DWORD errCode) {
        ret = errCode;
    }

    if (0 != events.onExit) {
        if (0 != workerThreadH) {
            ::SetEvent(events.onExit);
            if (WAIT_OBJECT_0 != ::WaitForSingleObject(
                workerThreadH, 5000)) {
                ::TerminateThread(workerThreadH, 1);
            }
            ::CloseHandle(workerThreadH);
        }
        ::CloseHandle(events.onExit);
    }

    if (0 != events.onStart) {
        ::CloseHandle(events.onStart);
    }

    return ret;
}

Всё самое интересное сосредоточено в 3-ёх достаточно простых функциях: sendMessage (вызывается из кода функции producer для отправки сообщений), receiveMessage (вызывается в потоке функции consumer для получения очередного сообщения) и receiver (обработчик APC, выполняющий "приём" сообщения и его копирование в очередь сообщений потока в TLS).

Начнём с функции отправки сообщений – sendMessage. Эта функция принимает 3 аргумента: описатель потока-получателя, список ранее отправленных сообщений и новое сообщение для отправки. Первым делом она пробегает по списку отправленных сообщений и удаляет те, которые были успешно "приняты". После чего формирует новое сообщение, сохраняет в список отправленных и отправляет потоку-получателю APC, передав в качестве параметра указатель на новое сообщение:
DWORD
sendMessage(
    HANDLE target,
    const std::string &message,
    std::list<Message> &sentMessages)
{
    // iterate over alls sent messages
    for(std::list<Message>::iterator it = sentMessages.begin();
        sentMessages.end() != it; /* do nothing */) {
        // check if message is already handled
        if (InterlockedExchangePointer(&(it->notHandled),
            it->notHandled) == 0) {
            // message was already handled - delete it
            it = sentMessages.erase(it);
        } else {
            ++it;
        }
    }

    // prepare new message
    sentMessages.push_back(Message());
    sentMessages.back().message = message;
    sentMessages.back().notHandled = 1;

    // place memory barier to prevent a re-ordering of read
    // and write operations; this, for example, flushes CPU
   
// caches (see details here and here); we need to do this
    // to guarantee that the write operations which update the

    // message's fields will be completed before the write
    // operation which passes the message's address into
    // the consumer thread by means of QueueUserAPC call,
    // the point is the documentation does not specify whether
    // QueueUserAPC function places memory barier or not

   
MemoryBarrier();

    // and send message
    if (::QueueUserAPC(&receiver, target, reinterpret_cast<
            ULONG_PTR>(&sentMessages.back())) == 0) {
        // no-zero result code means error
        return ::GetLastError();
    }

    return ERROR_SUCCESS;
}

При вызове функции QueueUserAPC в качестве обработчика APC указывается функция receiver. Она копирует полученное сообщение в очередь, указатель на которую поток-consumer разместил в TLS при старте, и сбрасывает флаг notHandled в 0, сообщив тем самым отправителю, что сообщение принято и может быть очищено:
void __stdcall receiver(ULONG_PTR data)
{
    Message *message = reinterpret_cast<Message*>(data);

    // copy the received message into TLS
    const_cast<std::queue<std::string>*>(
        messagesQueuePtrTLS)->push(message->message);

    // and mark it as handled
    InterlockedExchangePointer(&(message->notHandled), 0);
}

И наконец функция, извлекающая очередное сообщение из очереди и передающая его получателю. Надо отметить, что она самая тривиальная из этой троицы: извлекает очередное сообщение из очереди, которая передаётся ей в качестве аргумента, и возвращает его. Если же очередь пуста, то уходит в тревожное ожидание на событии завершения:
DWORD
receiveMessage(
    HANDLE onExit,
    volatile std::queue<std::string> &messagesQueue,
    std::string &message)
{
    if (const_cast<std::queue<std::string>&>(
        messagesQueue).empty()) {
        const DWORD waitRes = ::WaitForSingleObjectEx(
            onExit, INFINITE, true);

        if (WAIT_OBJECT_0 == waitRes) {
            return ON_EXIT;
        }

        if (WAIT_IO_COMPLETION != waitRes) {
            return ::GetLastError();
        }
    }

    message = const_cast<std::queue<std::string>&>(
        messagesQueue).front();
    const_cast<std::queue<std::string>&>(
        messagesQueue).pop();

    return ERROR_SUCCESS;
}

Ты не видишь суслика, а он есть!
К сожалению, в этом коротком и простом примере я не смог устранить все лишние блокировки. И хотя они не заметны невооружённым взглядом, на самом деле они присутствуют.

Для упрощения понимания и экономии своего времени я повсеместно использую стандартную библиотеку, которая в свою очередь активно использует операторы и функции распределения памяти С++ и CRT (new/delete и malloc/free). Эти вызовы в конечном итоге приводят к обращениям к той или иной куче (heap). Причём все потоки одного процесса будут общаться с одной и той же кучей. Вот здесь и возникают те самые скрытые блокировки. Компания Microsoft утверждает, что до минимума сведено как само количество блокировок при обращении к куче, так и время, на которое эти блокировки срабатывают. Но полностью устранить их не удалось.

При решении большинства обычных задач, вы никогда не столкнётесь с падением производительности, вызванным блокировками в куче Windows. Но в случае высоконагруженных систем с ними придётся считаться. В этом случае необходимо полностью отказаться от использования стандартных механизмов распределения памяти C++ в потоках-получателях (кстати, концепция аллокаторов стандартной библиотеки позволяет добиться этого наименьшей кровью).

Выше я упоминал об ещё одной причине, почему нельзя поручать потоку-получателю освобождение памяти под сообщением. Как не сложно догадаться, этой причиной как раз и являются скрытые блокировки при выделении и освобождении памяти в куче. Если вы будете выделять память в одном потоке, а освобождать в другом, то когда вы упрётесь в проблемы производительности, переделать такой код будет гораздо сложнее.

Комментариев нет:

Отправить комментарий