В прошлый раз я рассказывал о Data Flow подходе в разработке многозадачных приложений. Та заметка получилась совершенно без кода и примеров. В этот раз постараюсь исправиться – кода будет много. И весь код будет посвящён иллюстрации всё той же темы: Data Flow или конвейер данных.
Задача
Итак, нам нужно написать командный интерпретатор, обрабатывающий текстовый поток команд, разделённых символом перевода строки. Каждая команда должна быть: прочитана, распознана и выполнена. Результат выполнения записывается в выходной поток. Причём ответы должны поступать на выход в том же порядке, в котором команды поступали на вход.
Напомню: чтобы максимально задействовать возможности современных многоядерных машин, устранить Lock Convoy при синхронизации данных на выходе и удовлетворить требование о строгой последовательности вывода результата, я решил воспользоваться техникой Data Flow. Процесс обработки команды состоит из 4-ёх последовательных операций, которые зависят друг от друга только входными и выходными данными: считать команду, распознать команду, выполнить команду и записать результат. Любые 2 соседние операции могут выполняться одновременно (параллельно) обрабатывая 2 идущие друг за другом команды: когда выполняется 1-ая команда, интерпретатор может распознавать 2-ую.
Чтобы конвейер работал, кто-то должен поставлять ему данные, а именно: последовательно перебирать строки в потоке, отправляя каждую из них на вход конвейеру. Как не сложно заметить, этот кто-то в том числе позаботится и о считывании команды – первой операции конвейера. На эту роль наилучшим образом подходит агент. Кстати, создавая разных агентов для разных входных потоков, можно совершенно бесплатно получить поддержку нескольких одновременных сессий.
Финальное решение видеться следующим:
Решение
Для начала определим базовый класс с интерфейсом для объектов-команд:
Определим классы для команд gettime и echo:
Ещё нам понадобится класс представляющий собой "ошибочную команду", результатом исполнения которой будет сообщение об ошибке:
Теперь пришла пора заняться конвейером и агентом. В очередной раз я упрощу себе задачу и воспользуюсь возможностями Asynchronous Agents Library (AAL). На этот раз, помимо уже известного класса Concurrency::agent, в построении конвейера мне помогут классы блоков сообщений: Concurrency::transformer и Concurrency::unbounded_buffer. "Блоки сообщений" могут сцепляться друг с другом образуя конвейер (для этого используются либо функций link_target / link_source, либо перегруженные версии конструкторов). Библиотека AAL включает множество разнообразных блоков сообщений. Все они умеют принимать некоторые данные, обрабатывать их и отправлять результат стоящему за ними очередному блоку. Различия заключаются в способе обработки.
Блок класса transformer исполняет над полученными данными определённую при его конструировании функцию. Причём выполняется эта функция в отдельном потоке. Этот класс представляет собой шаблон, первый аргумент которого определяет тип данных, принимаемых блоком, второй – тип преобразованных данных, поступающих на выход. Блок класса unbound_buffer принимает и хранит данные определённого типа.
Итак, нам нужно определить агента, который будет содержать 2 сцепленных блока сообщений типа Concurrency::transformer: один для функции parse_command, другой для функции run_command. К последнему блоку необходимо прицепить Concurrency::unbound_buffer, для аккумулирования результата. Вот этот агент:
Наибольший интерес в этом классе представляет декларация объектов runner_ и parser_ и их инициализация в конструкторе, приводящая к образованию конвейера.
Агент принимает входные данные в конструкторе в виде текстового потока. Считывание и разбор на строки осуществляется в методе run, после чего каждая строка отправляется на начало конвейера – объект parser_ – с помощью функции Concurrency::asend:
Каждый из методов parse_command и run_command просто преобразует поступающие на вход данные. При этом ни один из них не содержит никакой специфики многопоточной обработки данных. Каждый из этих методов может быть вызван в однопоточном окружении, например для целей отладки и юнит-тестирования:
И завершающий аккорд – инициализация агента и вывод результата в теле основного потока приложения:
Полный код приложения можно скачать здесь.
Критика
Внимательный читатель мог заметить одно узкое место данного подхода. Если какой-то элемент конвейера начнёт длительную обработку данных или уснёт в ожидание ресурса, то весь конвейер встанет. Очевидно, что для таких случаев Data Flow подход в чистом виде совершенно не годится. Придётся совмещать различные техники и идти на компромиссы. К сожалению строгого рецепта для такого смешения не существует.
Задача
Итак, нам нужно написать командный интерпретатор, обрабатывающий текстовый поток команд, разделённых символом перевода строки. Каждая команда должна быть: прочитана, распознана и выполнена. Результат выполнения записывается в выходной поток. Причём ответы должны поступать на выход в том же порядке, в котором команды поступали на вход.
Напомню: чтобы максимально задействовать возможности современных многоядерных машин, устранить Lock Convoy при синхронизации данных на выходе и удовлетворить требование о строгой последовательности вывода результата, я решил воспользоваться техникой Data Flow. Процесс обработки команды состоит из 4-ёх последовательных операций, которые зависят друг от друга только входными и выходными данными: считать команду, распознать команду, выполнить команду и записать результат. Любые 2 соседние операции могут выполняться одновременно (параллельно) обрабатывая 2 идущие друг за другом команды: когда выполняется 1-ая команда, интерпретатор может распознавать 2-ую.
Чтобы конвейер работал, кто-то должен поставлять ему данные, а именно: последовательно перебирать строки в потоке, отправляя каждую из них на вход конвейеру. Как не сложно заметить, этот кто-то в том числе позаботится и о считывании команды – первой операции конвейера. На эту роль наилучшим образом подходит агент. Кстати, создавая разных агентов для разных входных потоков, можно совершенно бесплатно получить поддержку нескольких одновременных сессий.
Финальное решение видеться следующим:
- основной поток исполнения (в данном случае – это основной поток приложения, в реальной жизни – это может быть поток, принявший соединение из сокета или что-то в этом роде) создаёт агента, передаёт ему входной поток данных и ожидает поступления результатов, чтобы отправить их на выход;
- агент перебирает строки во входном потоке и для каждой строки запускает конвейер, состоящий из 2-ух операций – распознавание команды и исполнение команды;
- результат выполнения команды c последнего шага конвейера передаётся асинхронно в основной поток, запустивший всю операцию, где он считывается и отправляется на выход.
Решение
Для начала определим базовый класс с интерфейсом для объектов-команд:
class command;
typedef std::shared_ptr<command> command_ptr;
class command
{
public:
virtual ~command() { }
const std::string& name() const
{
return name_;
}
virtual command_ptr clone(const std::string &arg) const = 0;
virtual std::string execute() const = 0;
protected:
explicit command(const std::string &name)
: name_(name) { }
private:
std::string name_;
};
Обратите внимание на 2 чисто-виртуальных метода: clone и execute. Данный класс сочетает в себе сразу 2 стандартных шаблона проектирования из каталога "банды четырёх": Prototype и Command.typedef std::shared_ptr<command> command_ptr;
class command
{
public:
virtual ~command() { }
const std::string& name() const
{
return name_;
}
virtual command_ptr clone(const std::string &arg) const = 0;
virtual std::string execute() const = 0;
protected:
explicit command(const std::string &name)
: name_(name) { }
private:
std::string name_;
};
Определим классы для команд gettime и echo:
// Usage: gettime [gmt]
class get_time : public command
{
public:
static const get_time SAMPLE;
virtual command_ptr clone(const std::string &arg) const
{
return command_ptr(new get_time(arg));
}
virtual std::string execute() const;
private:
explicit get_time(const std::string &arg = "")
: command("gettime"),
gmt_("gmt" == arg),
bad_argument_(!gmt_ && !arg.empty())
{ }
bool gmt_;
bool bad_argument_;
};
const get_time get_time::SAMPLE;
// Usage: echo <text>
class echo : public command
{
public:
static const echo SAMPLE;
virtual command_ptr clone(const std::string &arg) const
{
return command_ptr(new echo(arg));
}
virtual std::string execute() const
{
return text_;
}
private:
explicit echo(const std::string &arg = "")
: command("echo"), text_(arg) { }
std::string text_;
};
const echo echo::SAMPLE;
class get_time : public command
{
public:
static const get_time SAMPLE;
virtual command_ptr clone(const std::string &arg) const
{
return command_ptr(new get_time(arg));
}
virtual std::string execute() const;
private:
explicit get_time(const std::string &arg = "")
: command("gettime"),
gmt_("gmt" == arg),
bad_argument_(!gmt_ && !arg.empty())
{ }
bool gmt_;
bool bad_argument_;
};
const get_time get_time::SAMPLE;
// Usage: echo <text>
class echo : public command
{
public:
static const echo SAMPLE;
virtual command_ptr clone(const std::string &arg) const
{
return command_ptr(new echo(arg));
}
virtual std::string execute() const
{
return text_;
}
private:
explicit echo(const std::string &arg = "")
: command("echo"), text_(arg) { }
std::string text_;
};
const echo echo::SAMPLE;
Ещё нам понадобится класс представляющий собой "ошибочную команду", результатом исполнения которой будет сообщение об ошибке:
class bad_command : public command
{
public:
static const std::string NAME;
...
};
{
public:
static const std::string NAME;
...
};
Теперь пришла пора заняться конвейером и агентом. В очередной раз я упрощу себе задачу и воспользуюсь возможностями Asynchronous Agents Library (AAL). На этот раз, помимо уже известного класса Concurrency::agent, в построении конвейера мне помогут классы блоков сообщений: Concurrency::transformer и Concurrency::unbounded_buffer. "Блоки сообщений" могут сцепляться друг с другом образуя конвейер (для этого используются либо функций link_target / link_source, либо перегруженные версии конструкторов). Библиотека AAL включает множество разнообразных блоков сообщений. Все они умеют принимать некоторые данные, обрабатывать их и отправлять результат стоящему за ними очередному блоку. Различия заключаются в способе обработки.
Блок класса transformer исполняет над полученными данными определённую при его конструировании функцию. Причём выполняется эта функция в отдельном потоке. Этот класс представляет собой шаблон, первый аргумент которого определяет тип данных, принимаемых блоком, второй – тип преобразованных данных, поступающих на выход. Блок класса unbound_buffer принимает и хранит данные определённого типа.
Итак, нам нужно определить агента, который будет содержать 2 сцепленных блока сообщений типа Concurrency::transformer: один для функции parse_command, другой для функции run_command. К последнему блоку необходимо прицепить Concurrency::unbound_buffer, для аккумулирования результата. Вот этот агент:
class command_processor : public Concurrency::agent
{
typedef std::map<std::string, const command*> commands_dict;
// t_line::first - line
// t_line::second - "end of file" flag
typedef std::pair<std::string, bool> t_line;
// t_command::first - command
// t_command::second - "end of file" flag
typedef std::pair<command_ptr, bool> t_command;
public:
explicit command_processor(std::istream &input);
virtual ~command_processor()
{
Concurrency::agent::wait(this);
}
static void register_command(
const std::string &name, const command *cmd)
{
COMMANDS[name] = cmd;
}
std::string read(bool &next)
{
// Read results from output_.
// This operation blocks a thread if output_ is empty.
std::pair<std::string, bool> out =
Concurrency::receive(output_);
next = out.second;
return out.first;
}
protected:
void run();
private:
static t_line run_command(const t_command &command);
static t_command parse_command(const t_line &line);
static commands_dict COMMANDS;
std::istream &input_;
// receive: t_line
Concurrency::unbounded_buffer<t_line> output_;
// receive: t_comand; send: t_line - result
Concurrency::transformer<t_command, t_line> runner_;
// receive: t_line - unparsed command; send: t_command
Concurrency::transformer<t_line, t_command> parser_;
};
command_processor::command_processor(std::istream &input)
: input_(input),
// the output buffer:
output_(),
// the run step is linked with the output buffer:
runner_(run_command, &output_),
// the execute step is linked with the run step:
parser_(parse_command, &runner_)
{ }
{
typedef std::map<std::string, const command*> commands_dict;
// t_line::first - line
// t_line::second - "end of file" flag
typedef std::pair<std::string, bool> t_line;
// t_command::first - command
// t_command::second - "end of file" flag
typedef std::pair<command_ptr, bool> t_command;
public:
explicit command_processor(std::istream &input);
virtual ~command_processor()
{
Concurrency::agent::wait(this);
}
static void register_command(
const std::string &name, const command *cmd)
{
COMMANDS[name] = cmd;
}
std::string read(bool &next)
{
// Read results from output_.
// This operation blocks a thread if output_ is empty.
std::pair<std::string, bool> out =
Concurrency::receive(output_);
next = out.second;
return out.first;
}
protected:
void run();
private:
static t_line run_command(const t_command &command);
static t_command parse_command(const t_line &line);
static commands_dict COMMANDS;
std::istream &input_;
// receive: t_line
Concurrency::unbounded_buffer<t_line> output_;
// receive: t_comand; send: t_line - result
Concurrency::transformer<t_command, t_line> runner_;
// receive: t_line - unparsed command; send: t_command
Concurrency::transformer<t_line, t_command> parser_;
};
command_processor::command_processor(std::istream &input)
: input_(input),
// the output buffer:
output_(),
// the run step is linked with the output buffer:
runner_(run_command, &output_),
// the execute step is linked with the run step:
parser_(parse_command, &runner_)
{ }
Наибольший интерес в этом классе представляет декларация объектов runner_ и parser_ и их инициализация в конструкторе, приводящая к образованию конвейера.
Агент принимает входные данные в конструкторе в виде текстового потока. Считывание и разбор на строки осуществляется в методе run, после чего каждая строка отправляется на начало конвейера – объект parser_ – с помощью функции Concurrency::asend:
void command_processor::run()
{
std::string line;
do
{
char ch = L'\0';
input_.get(ch);
if (ch == '\n' || input_.eof()) {
// send line to parser
t_line new_line = std::make_pair(
line, !input_.eof());
Concurrency::asend(
parser_, new_line);
line.clear();
} else if (input_.good()) {
line += ch;
} else {
// send "bad command" to parser and stop processing
t_line abort_line = std::make_pair(
bad_command::NAME, false);
Concurrency::asend(
parser_, abort_line);
}
} while (input_ != 0);
done();
}
{
std::string line;
do
{
char ch = L'\0';
input_.get(ch);
if (ch == '\n' || input_.eof()) {
// send line to parser
t_line new_line = std::make_pair(
line, !input_.eof());
Concurrency::asend(
parser_, new_line);
line.clear();
} else if (input_.good()) {
line += ch;
} else {
// send "bad command" to parser and stop processing
t_line abort_line = std::make_pair(
bad_command::NAME, false);
Concurrency::asend(
parser_, abort_line);
}
} while (input_ != 0);
done();
}
Каждый из методов parse_command и run_command просто преобразует поступающие на вход данные. При этом ни один из них не содержит никакой специфики многопоточной обработки данных. Каждый из этих методов может быть вызван в однопоточном окружении, например для целей отладки и юнит-тестирования:
command_processor::t_command
command_processor::parse_command(
const command_processor::t_line &line)
{
// detect the command's name
size_t pos = line.first.find_first_of(" \t");
const std::string name = line.first.substr(0, pos);
// detect the command's arguments
pos = line.first.find_first_not_of(" \t", pos);
const std::string arg = std::string::npos == pos
? std::string() : line.first.substr(pos);
// get the command object
commands_dict::const_iterator it = COMMANDS.find(name);
if (COMMANDS.end() != it) {
return std::make_pair(
it->second->clone(arg), line.second);
}
// bad command
return std::make_pair(
command_ptr(new bad_command(line.first)),
line.second);
}
command_processor::t_line
command_processor::run_command(
const command_processor::t_command &command)
{
return std::make_pair(
command.first->execute(), command.second);
}
command_processor::parse_command(
const command_processor::t_line &line)
{
// detect the command's name
size_t pos = line.first.find_first_of(" \t");
const std::string name = line.first.substr(0, pos);
// detect the command's arguments
pos = line.first.find_first_not_of(" \t", pos);
const std::string arg = std::string::npos == pos
? std::string() : line.first.substr(pos);
// get the command object
commands_dict::const_iterator it = COMMANDS.find(name);
if (COMMANDS.end() != it) {
return std::make_pair(
it->second->clone(arg), line.second);
}
// bad command
return std::make_pair(
command_ptr(new bad_command(line.first)),
line.second);
}
command_processor::t_line
command_processor::run_command(
const command_processor::t_command &command)
{
return std::make_pair(
command.first->execute(), command.second);
}
И завершающий аккорд – инициализация агента и вывод результата в теле основного потока приложения:
// open file stream
std::ifstream data;
data.exceptions(std::ios::badbit);
data.open(filename);
// register all known commands
command_processor::register_command(
get_time::SAMPLE.name(), &get_time::SAMPLE);
command_processor::register_command(
echo::SAMPLE.name(), &echo::SAMPLE);
// start
command_processor processor(data);
processor.start();
// read result and write into console
bool next;
do {
std::cout << processor.read(next) << std::endl;
} while (next);
std::ifstream data;
data.exceptions(std::ios::badbit);
data.open(filename);
// register all known commands
command_processor::register_command(
get_time::SAMPLE.name(), &get_time::SAMPLE);
command_processor::register_command(
echo::SAMPLE.name(), &echo::SAMPLE);
// start
command_processor processor(data);
processor.start();
// read result and write into console
bool next;
do {
std::cout << processor.read(next) << std::endl;
} while (next);
Полный код приложения можно скачать здесь.
Критика
Внимательный читатель мог заметить одно узкое место данного подхода. Если какой-то элемент конвейера начнёт длительную обработку данных или уснёт в ожидание ресурса, то весь конвейер встанет. Очевидно, что для таких случаев Data Flow подход в чистом виде совершенно не годится. Придётся совмещать различные техники и идти на компромиссы. К сожалению строгого рецепта для такого смешения не существует.
1 комментарий:
И все таки, при всем моем уважении к "плюсам", ООП в них явно страдает. Я еще я совсем не понимаю зачем везде префиксы пространств имен - в .Net так не делают, просто делают using, в С++ это наверное using namespace xyz;
Отправить комментарий