Различные средства параллельного программирования от библиотек вроде Concurrency Runtime до специализированных языков вроде Erlang предоставляют такую типовую операцию, как отправка асинхронного сообщения потоку. А каким образом можно реализовать эту возможность, опираясь только функционал WinAPI. Попробуем разобраться.
На самом деле реализовать асинхронный обмен сообщениями между потоками не сложно. Сложно сделать это с минимальным количеством взаимных блокировок. В идеале отправка и приём сообщения не должны блокировать взаимодействующие потоки. Допустим только один тип блокировки: блокировка принимающего потока в ожидании очередного сообщения.
Не всякая классика заслуживает внимания
Итак, какие системные средства, позволяют достичь поставленные цели? Первое что приходит в голову – это функция PostThreadMessage, отправляющая сообщение в очередь сообщений потока. Это та же самая очередь, которая используется для управления графическими элементами пользовательского интерфейса. Поэтому у данного метода есть один существенный недостаток: если поток случайно (или намеренно) откроет модальное диалоговое окно, все сообщения, посланные потоку в это время будут обработаны окном и потеряны безвозвратно.
На втором месте идёт тяжёлая артиллерия в лице pipe`ов с перекрывающимся (ovelapped) доступом. В этом случае каналом передачи сообщений является именованный pipe, открытый в режиме перекрывающегося доступа. Передача и приём сообщений осуществляется с помощью операций асинхронной записи и чтения соответственно. Богатый функционал и обширные возможности этого подхода не дают одного – простоты. В случае, если передаваемые сообщения представляют собой обычные целые числа, pipe`ы кажутся излишними.
Очереди разные важны, очереди разные нужны
Метод, который я хотел рассмотреть, основа на использовании очереди вызовов асинхронных процедур (asynchronous procedure calls – APC). Подобная очередь связана с каждым потоком и активно используется операционной системой для обработки таких, например, операций, как перекрывающий ввод-вывод. Вызов асинхронной процедуры, добавленный в очередь потока, будет осуществлён в контексте выполнения этого потока.
Добавлять асинхронные вызовы в очередь потока может не только система, но и любое приложение пользовательского режима. Для этого достаточно вызвать системную функция QueueUserAPC, передав ей в качестве параметров:
1) указатель на функцию асинхронной процедуры;
2) описатель (HANDLE) потока;
3) произвольные данные типа ULONG_PTR, которые будут переданы функции асинхронной процедуры при её вызове.
Если вы сделаете всё правильно, то любая попытка тревожного ожидания (alertable waiting), в потоке, в очередь которого добавлен APC, завершиться вызовом функции асинхронной процедуры. При этом сама функция ожидания вернёт управление в поток с кодом завершения WAIT_IO_COMPLETION. Инициировать тревожное ожидание можно с помощью расширенных версий синхронизирующих функций: SleepEx, WaitForSingleObjectEx, WaitForMultipleObjectsEx и так далее. Каждая из них имеет дополнительный параметр bAlertable типа BOOL, передача ему значения TRUE вызывает ожидание в тревожном состоянии.
Не сложно заметить, что комбинации указателя на функцию и параметра типа ULONG_PTR уже достаточно, чтобы наладить передачу простейших асинхронных сообщений. Во многих случаях этого оказывается вполне достаточно. Тем более, что сама функция является частью передаваемого сообщения. Ниже пример простейшей реализации данного подхода.
Для начала определим передаваемые сообщения и одну асинхронную процедуру, которая будет их принимать и обрабатывать:
Теперь определим рабочий поток – потребитель сообщений. Он достаточно примитивен: после запуска сообщает о своей готовности с помощью объекта-события onStart; после чего ожидает в тревожном режиме перехода в сигнальное состояние объекта-события onExit, сигнализирующего о завершении работы. Причём ожидание запускается в цикле:
Самой запутанной является функция, запускающая поток обработки и отправляющая ему сообщения. Причём наиболее сложная её часть связана с обработкой ошибок:
В следующий раз я рассмотрю более сложный вариант реализации обмена асинхронными сообщениями с помощью APC, позволяющий передавать сообщения любой длинны, а не только sizeof(ULONG_PTR).
На самом деле реализовать асинхронный обмен сообщениями между потоками не сложно. Сложно сделать это с минимальным количеством взаимных блокировок. В идеале отправка и приём сообщения не должны блокировать взаимодействующие потоки. Допустим только один тип блокировки: блокировка принимающего потока в ожидании очередного сообщения.
Не всякая классика заслуживает внимания
Итак, какие системные средства, позволяют достичь поставленные цели? Первое что приходит в голову – это функция PostThreadMessage, отправляющая сообщение в очередь сообщений потока. Это та же самая очередь, которая используется для управления графическими элементами пользовательского интерфейса. Поэтому у данного метода есть один существенный недостаток: если поток случайно (или намеренно) откроет модальное диалоговое окно, все сообщения, посланные потоку в это время будут обработаны окном и потеряны безвозвратно.
На втором месте идёт тяжёлая артиллерия в лице pipe`ов с перекрывающимся (ovelapped) доступом. В этом случае каналом передачи сообщений является именованный pipe, открытый в режиме перекрывающегося доступа. Передача и приём сообщений осуществляется с помощью операций асинхронной записи и чтения соответственно. Богатый функционал и обширные возможности этого подхода не дают одного – простоты. В случае, если передаваемые сообщения представляют собой обычные целые числа, pipe`ы кажутся излишними.
Очереди разные важны, очереди разные нужны
Метод, который я хотел рассмотреть, основа на использовании очереди вызовов асинхронных процедур (asynchronous procedure calls – APC). Подобная очередь связана с каждым потоком и активно используется операционной системой для обработки таких, например, операций, как перекрывающий ввод-вывод. Вызов асинхронной процедуры, добавленный в очередь потока, будет осуществлён в контексте выполнения этого потока.
Добавлять асинхронные вызовы в очередь потока может не только система, но и любое приложение пользовательского режима. Для этого достаточно вызвать системную функция QueueUserAPC, передав ей в качестве параметров:
1) указатель на функцию асинхронной процедуры;
2) описатель (HANDLE) потока;
3) произвольные данные типа ULONG_PTR, которые будут переданы функции асинхронной процедуры при её вызове.
Если вы сделаете всё правильно, то любая попытка тревожного ожидания (alertable waiting), в потоке, в очередь которого добавлен APC, завершиться вызовом функции асинхронной процедуры. При этом сама функция ожидания вернёт управление в поток с кодом завершения WAIT_IO_COMPLETION. Инициировать тревожное ожидание можно с помощью расширенных версий синхронизирующих функций: SleepEx, WaitForSingleObjectEx, WaitForMultipleObjectsEx и так далее. Каждая из них имеет дополнительный параметр bAlertable типа BOOL, передача ему значения TRUE вызывает ожидание в тревожном состоянии.
Не сложно заметить, что комбинации указателя на функцию и параметра типа ULONG_PTR уже достаточно, чтобы наладить передачу простейших асинхронных сообщений. Во многих случаях этого оказывается вполне достаточно. Тем более, что сама функция является частью передаваемого сообщения. Ниже пример простейшей реализации данного подхода.
Для начала определим передаваемые сообщения и одну асинхронную процедуру, которая будет их принимать и обрабатывать:
enum WorkMessage {
stand_up,
dress,
cook,
eat,
dish_up,
wash,
close_the_door,
drive,
work
};
void __stdcall printMessage(ULONG_PTR data)
{
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Message: " << (WorkMessage)data << std::endl;
}
stand_up,
dress,
cook,
eat,
dish_up,
wash,
close_the_door,
drive,
work
};
void __stdcall printMessage(ULONG_PTR data)
{
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Message: " << (WorkMessage)data << std::endl;
}
Теперь определим рабочий поток – потребитель сообщений. Он достаточно примитивен: после запуска сообщает о своей готовности с помощью объекта-события onStart; после чего ожидает в тревожном режиме перехода в сигнальное состояние объекта-события onExit, сигнализирующего о завершении работы. Причём ожидание запускается в цикле:
unsigned int __stdcall simpleConsumer(void *data)
{
ControlEvents events = *reinterpret_cast<ControlEvents*>(data);
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Consumer." << std::endl;
::SetEvent(events.onStart);
DWORD result = ERROR_SUCCESS, waitRes;
while (WAIT_OBJECT_0 != (waitRes =
::WaitForSingleObjectEx(events.onExit, INFINITE, true))) {
if (WAIT_IO_COMPLETION != waitRes) {
// error happens
result = ::GetLastError();
break;
}
}
return result;
}
{
ControlEvents events = *reinterpret_cast<ControlEvents*>(data);
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Consumer." << std::endl;
::SetEvent(events.onStart);
DWORD result = ERROR_SUCCESS, waitRes;
while (WAIT_OBJECT_0 != (waitRes =
::WaitForSingleObjectEx(events.onExit, INFINITE, true))) {
if (WAIT_IO_COMPLETION != waitRes) {
// error happens
result = ::GetLastError();
break;
}
}
return result;
}
Самой запутанной является функция, запускающая поток обработки и отправляющая ему сообщения. Причём наиболее сложная её часть связана с обработкой ошибок:
DWORD
simpleAPCProducer()
{
typedef std::vector<WorkMessage> TWorkMessages;
TWorkMessages works;
works.reserve(12);
works.push_back(stand_up);
works.push_back(dress);
works.push_back(cook);
works.push_back(eat);
works.push_back(dish_up);
works.push_back(wash);
works.push_back(close_the_door);
works.push_back(drive);
works.push_back(work);
works.push_back(work);
works.push_back(work);
works.push_back(work);
ControlEvents events = {0};
HANDLE workerThreadH = 0;
DWORD ret = ERROR_SUCCESS;
try {
events.onExit = ::CreateEvent(0, true, false, 0);
if (0 == events.onExit) {
throw ::GetLastError();
}
events.onStart = ::CreateEvent(0, true, false, 0);
if (0 == events.onStart) {
throw ::GetLastError();
}
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Producer." << std::endl;
workerThreadH = reinterpret_cast<HANDLE>(
_beginthreadex(0, 0, &simpleConsumer, &events, 0, 0));
if (0 == workerThreadH) {
throw ::GetLastError();
}
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
events.onStart, INFINITE)) {
throw ::GetLastError();
}
for(TWorkMessages::const_iterator it = works.begin();
works.end() != it; ++it)
{
::QueueUserAPC(
&printMessage, workerThreadH, (ULONG_PTR)*it);
}
// wait a litle
::Sleep(1000);
} catch (DWORD errCode) {
ret = errCode;
}
if (0 != events.onExit) {
if (0 != workerThreadH) {
::SetEvent(events.onExit);
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
workerThreadH, 5000)) {
::TerminateThread(workerThreadH, 1);
}
::CloseHandle(workerThreadH);
}
::CloseHandle(events.onExit);
}
if (0 != events.onStart) {
::CloseHandle(events.onStart);
}
return ret;
}
simpleAPCProducer()
{
typedef std::vector<WorkMessage> TWorkMessages;
TWorkMessages works;
works.reserve(12);
works.push_back(stand_up);
works.push_back(dress);
works.push_back(cook);
works.push_back(eat);
works.push_back(dish_up);
works.push_back(wash);
works.push_back(close_the_door);
works.push_back(drive);
works.push_back(work);
works.push_back(work);
works.push_back(work);
works.push_back(work);
ControlEvents events = {0};
HANDLE workerThreadH = 0;
DWORD ret = ERROR_SUCCESS;
try {
events.onExit = ::CreateEvent(0, true, false, 0);
if (0 == events.onExit) {
throw ::GetLastError();
}
events.onStart = ::CreateEvent(0, true, false, 0);
if (0 == events.onStart) {
throw ::GetLastError();
}
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Producer." << std::endl;
workerThreadH = reinterpret_cast<HANDLE>(
_beginthreadex(0, 0, &simpleConsumer, &events, 0, 0));
if (0 == workerThreadH) {
throw ::GetLastError();
}
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
events.onStart, INFINITE)) {
throw ::GetLastError();
}
for(TWorkMessages::const_iterator it = works.begin();
works.end() != it; ++it)
{
::QueueUserAPC(
&printMessage, workerThreadH, (ULONG_PTR)*it);
}
// wait a litle
::Sleep(1000);
} catch (DWORD errCode) {
ret = errCode;
}
if (0 != events.onExit) {
if (0 != workerThreadH) {
::SetEvent(events.onExit);
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
workerThreadH, 5000)) {
::TerminateThread(workerThreadH, 1);
}
::CloseHandle(workerThreadH);
}
::CloseHandle(events.onExit);
}
if (0 != events.onStart) {
::CloseHandle(events.onStart);
}
return ret;
}
В следующий раз я рассмотрю более сложный вариант реализации обмена асинхронными сообщениями с помощью APC, позволяющий передавать сообщения любой длинны, а не только sizeof(ULONG_PTR).
Комментариев нет:
Отправить комментарий