понедельник, 24 октября 2011 г.

Data Flow, или Данные управляют командами. Практика

В прошлый раз я рассказывал о Data Flow подходе в разработке многозадачных приложений. Та заметка получилась совершенно без кода и примеров. В этот раз постараюсь исправиться – кода будет много. И весь код будет посвящён иллюстрации всё той же темы: Data Flow или конвейер данных.


Задача

Итак, нам нужно написать командный интерпретатор, обрабатывающий текстовый поток команд, разделённых символом перевода строки. Каждая команда должна быть: прочитана, распознана и выполнена. Результат выполнения записывается в выходной поток. Причём ответы должны поступать на выход в том же порядке, в котором команды поступали на вход.

Напомню: чтобы максимально задействовать возможности современных многоядерных машин, устранить Lock Convoy при синхронизации данных на выходе и удовлетворить требование о строгой последовательности вывода результата, я решил воспользоваться техникой Data Flow. Процесс обработки команды состоит из 4-ёх последовательных операций, которые зависят друг от друга только входными и выходными данными: считать команду, распознать команду, выполнить команду и записать результат. Любые 2 соседние операции могут выполняться одновременно (параллельно) обрабатывая 2 идущие друг за другом команды: когда выполняется 1-ая команда, интерпретатор может распознавать 2-ую.

Чтобы конвейер работал, кто-то должен поставлять ему данные, а именно: последовательно перебирать строки в потоке, отправляя каждую из них на вход конвейеру. Как не сложно заметить, этот кто-то в том числе позаботится и о считывании команды – первой операции конвейера. На эту роль наилучшим образом подходит агент. Кстати, создавая разных агентов для разных входных потоков, можно совершенно бесплатно получить поддержку нескольких одновременных сессий.

Финальное решение видеться следующим:
  • основной поток исполнения (в данном случае – это основной поток приложения, в реальной жизни – это может быть поток, принявший соединение из сокета или что-то в этом роде) создаёт агента, передаёт ему входной поток данных и ожидает поступления результатов, чтобы отправить их на выход;
  • агент перебирает строки во входном потоке и для каждой строки запускает конвейер, состоящий из 2-ух операций – распознавание команды и исполнение команды;
  • результат выполнения команды c последнего шага конвейера передаётся асинхронно в основной поток, запустивший всю операцию, где он считывается и отправляется на выход.


Решение

Для начала определим базовый класс с интерфейсом для объектов-команд:
class command;
typedef std::shared_ptr<command> command_ptr;

class command
{
public:
    virtual ~command() { }

    const std::string& name() const
    {
        return name_;
    }

    virtual command_ptr clone(const std::string &arg) const = 0;

    virtual std::string execute() const = 0;

protected:
    explicit command(const std::string &name)
        : name_(name) { }

private:
    std::string name_;
};
Обратите внимание на 2 чисто-виртуальных метода: clone и execute. Данный класс сочетает в себе сразу 2 стандартных шаблона проектирования из каталога "банды четырёх": Prototype и Command.

Определим классы для команд gettime и echo:
// Usage: gettime [gmt]
class get_time : public command
{
public:
    static const get_time SAMPLE;

    virtual command_ptr clone(const std::string &arg) const
    {
        return command_ptr(new get_time(arg));
    }

    virtual std::string execute() const;

private:
    explicit get_time(const std::string &arg = "")
        : command("gettime"),
        gmt_("gmt" == arg),
        bad_argument_(!gmt_ && !arg.empty())
    { }

    bool gmt_;
    bool bad_argument_;
};
const get_time get_time::SAMPLE;

// Usage: echo <text>
class echo : public command
{
public:
    static const echo SAMPLE;

    virtual command_ptr clone(const std::string &arg) const
    {
        return command_ptr(new echo(arg));
    }

    virtual std::string execute() const
    {
        return text_;
    }

private:
    explicit echo(const std::string &arg = "")
        : command("echo"), text_(arg) { }

    std::string text_;
};
const echo echo::SAMPLE;

Ещё нам понадобится класс представляющий собой "ошибочную команду", результатом исполнения которой будет сообщение об ошибке:
class bad_command : public command
{
public:
    static const std::string NAME;

...
};

Теперь пришла пора заняться конвейером и агентом. В очередной раз я упрощу себе задачу и воспользуюсь возможностями Asynchronous Agents Library (AAL). На этот раз, помимо уже известного класса Concurrency::agent, в построении конвейера мне помогут классы блоков сообщений: Concurrency::transformer и Concurrency::unbounded_buffer. "Блоки сообщений" могут сцепляться друг с другом образуя конвейер (для этого используются либо функций link_target / link_source, либо перегруженные версии конструкторов). Библиотека AAL включает множество разнообразных блоков сообщений. Все они умеют принимать некоторые данные, обрабатывать их и отправлять результат стоящему за ними очередному блоку. Различия заключаются в способе обработки.

Блок класса transformer исполняет над полученными данными определённую при его конструировании функцию. Причём выполняется эта функция в отдельном потоке. Этот класс представляет собой шаблон, первый аргумент которого определяет тип данных, принимаемых блоком, второй – тип преобразованных данных, поступающих на выход. Блок класса unbound_buffer принимает и хранит данные определённого типа.

Итак, нам нужно определить агента, который будет содержать 2 сцепленных блока сообщений типа Concurrency::transformer: один для функции parse_command, другой для функции run_command. К последнему блоку необходимо прицепить Concurrency::unbound_buffer, для аккумулирования результата. Вот этот агент:
class command_processor : public Concurrency::agent
{
    typedef std::map<std::string, const command*> commands_dict;

    // t_line::first - line
    // t_line::second - "end of file" flag
    typedef std::pair<std::string, bool> t_line;

    // t_command::first - command
    // t_command::second - "end of file" flag
    typedef std::pair<command_ptr, bool> t_command;

public:
    explicit command_processor(std::istream &input);

    virtual ~command_processor()
    {
        Concurrency::agent::wait(this);
    }

    static void register_command(
        const std::string &name, const command *cmd)
    {
        COMMANDS[name] = cmd;
    }

    std::string read(bool &next)
    {
        // Read results from output_.
        // This operation blocks a thread if output_ is empty.
        std::pair<std::string, bool> out =
            Concurrency::receive(output_);
        next = out.second;
        return out.first;
    }

protected:
    void run();

private:
    static t_line run_command(const t_command &command);
    static t_command parse_command(const t_line &line);

    static commands_dict COMMANDS;

    std::istream &input_;

    // receive: t_line
    Concurrency::unbounded_buffer<t_line> output_;

    // receive: t_comand; send: t_line - result
    Concurrency::transformer<t_command, t_line> runner_;
    
    // receive: t_line - unparsed command; send: t_command
    Concurrency::transformer<t_line, t_command> parser_;
};

command_processor::command_processor(std::istream &input)
    : input_(input),
    // the output buffer:
    output_(),
    // the run step is linked with the output buffer:
    runner_(run_command, &output_),
    // the execute step is linked with the run step:
    parser_(parse_command, &runner_)
{ }

Наибольший интерес в этом классе представляет декларация объектов runner_ и parser_ и их инициализация в конструкторе, приводящая к образованию конвейера.

Агент принимает входные данные в конструкторе в виде текстового потока. Считывание и разбор на строки осуществляется в методе run, после чего каждая строка отправляется на начало конвейера – объект parser_ – с помощью функции Concurrency::asend:
void command_processor::run()
{
    std::string line;
    do 
    {
        char ch = L'\0';
        input_.get(ch);

        if (ch == '\n' || input_.eof()) {
            // send line to parser
            t_line new_line = std::make_pair(
                line, !input_.eof());

            Concurrency::asend(
                parser_, new_line);

            line.clear();
        } else if (input_.good()) {
            line += ch;
        } else {
            // send "bad command" to parser and stop processing
            t_line abort_line = std::make_pair(
                bad_command::NAME, false);
            Concurrency::asend(
                parser_, abort_line);
        }
    } while (input_ != 0);

    done();
}

Каждый из методов parse_command и run_command просто преобразует поступающие на вход данные. При этом ни один из них не содержит никакой специфики многопоточной обработки данных. Каждый из этих методов может быть вызван в однопоточном окружении, например для целей отладки и юнит-тестирования:
command_processor::t_command
command_processor::parse_command(
    const command_processor::t_line &line)
{
    // detect the command's name
    size_t pos = line.first.find_first_of(" \t");
    const std::string name = line.first.substr(0, pos);
    
    // detect the command's arguments
    pos = line.first.find_first_not_of(" \t", pos);
    const std::string arg = std::string::npos == pos
        ? std::string() : line.first.substr(pos);

    // get the command object
    commands_dict::const_iterator it = COMMANDS.find(name);
    if (COMMANDS.end() != it) {
        return std::make_pair(
            it->second->clone(arg), line.second);
    }

    // bad command
    return std::make_pair(
        command_ptr(new bad_command(line.first)),
        line.second);
}

command_processor::t_line
command_processor::run_command(
    const command_processor::t_command &command)
{
    return std::make_pair(
        command.first->execute(), command.second);
}

И завершающий аккорд – инициализация агента и вывод результата в теле основного потока приложения:
// open file stream
std::ifstream data;
data.exceptions(std::ios::badbit);
data.open(filename);

// register all known commands
command_processor::register_command(
    get_time::SAMPLE.name(), &get_time::SAMPLE);
command_processor::register_command(
    echo::SAMPLE.name(), &echo::SAMPLE);

// start
command_processor processor(data);
processor.start();

// read result and write into console
bool next;
do {
    std::cout << processor.read(next) << std::endl;
} while (next);

Полный код приложения можно скачать здесь.


Критика
Внимательный читатель мог заметить одно узкое место данного подхода. Если какой-то элемент конвейера начнёт длительную обработку данных или уснёт в ожидание ресурса, то весь конвейер встанет. Очевидно, что для таких случаев Data Flow подход в чистом виде совершенно не годится. Придётся совмещать различные техники и идти на компромиссы. К сожалению строгого рецепта для такого смешения не существует.

1 комментарий:

Анонимный комментирует...

И все таки, при всем моем уважении к "плюсам", ООП в них явно страдает. Я еще я совсем не понимаю зачем везде префиксы пространств имен - в .Net так не делают, просто делают using, в С++ это наверное using namespace xyz;

Отправить комментарий