среда, 14 сентября 2011 г.

Агент Асинхронность, ваш выход!

В предыдущей заметке я постарался объяснить, почему попытка решить задачу распараллеливания "в лоб" может потерпеть фиаско. И рассказал о модели рабочих элементов – самом распространённом паттерне параллельного мира, призванном эффективно решать задачу масштабирования. Пришла пора поговорить о его недостатке...


Независимость – это когда от тебя ничего не зависит...
Подход, связанный с разбиение многопоточного приложения на задачи, прекрасно работает ровно до тех пор, пока задачи более-менее не зависимы друг от друга. Но, как только возникнет необходимость организовать взаимодействие между рабочими элементами, вы столкнётесь с проблемой. Точно с такой же проблемой в своё время столкнулись программисты древности, использовавшие парадигму процедурного программирования. Это не что иное, как проблема связывания данных с функциями. Ответом на этот вызов явился объектно-ориентированный подход.

По аналогии с классическими парадигмами программирования, можно сказать, что модель рабочих элементов является процедурным подходом в разработке многопоточных приложений. Решить его проблемы призвана модель асинхронных агентов (или модель актёров) – объектно-ориентированная парадигма параллельного мира.


Поручите работу агентам.
Асинхронный агент или актёр – это объект. Любой агент реализует ровно одну задачу. При этом все вычисления в рамках этой задачи осуществляются в отдельном потоке (который, как водится, предоставляется агенту из пула во временное пользование). В процессе выполнения задачи агент может менять своё внутреннее состояние, сосредоточенное в полях данных. Внешние наблюдатели через вызов публичных методов агента могут запрашивать его состояние или отправлять ему сообщения, влияющие на вычисления. Любой публичный метод агента умеет либо возвращать внутреннее состояние, либо передавать данные выполняющейся задаче (некоторые методы совмещают обе эти функции): ни один публичный метод не выполняет никаких вычислений. Задача, реализуемая агентом, может быть как конечной (в этом случае агент может перейти из состояния "запущен" в состояние "завершён"), так и выполняемой непрерывно (в этом случае задача завершается принудительно, например, при уничтожении агента).


Тяжело в учении, легко на работе...
Иногда бывает проще воспринимать подобные вещи, глядя на реальный пример. Поэтому чтобы проиллюстрировать сказанное, я реализую паттерн "Producer / Consumer" с помощью асинхронных агентов. Если кто забыл, то суть этого паттерна заключается в разнесении по разным потокам поставщика (Producer`а) и потребителя (Consumer`а) данных. Я не буду реализовывать инфраструктуру асинхронных агентов с нуля, а воспользуюсь возможностями Asynchronous Agents Library (AAL), поставляющейся вместе с MS Visual Studio 2010. В качестве эксперимента я решу таким мудрёным способом простую задачу подсчёта числа слов в текстовом файле.

Агент Producer будет считывать данные из переданного ему текстового файла и построчно передавать их объекту Consumer`у, который будет подсчитывать число слов в очередной строке (для простоты примем, что файл не содержит переносов слов).

Для начала определим интерфейс Consumer`а:
class i_lines_processor
{
public:
    virtual ~i_lines_processor() { }
    virtual void submit_line(const std::wstring &) = 0;
    virtual void complete() = 0;
};
Producer будет вызывать метод submit_line для каждой очередной прочитанной строки и завершит обработку вызовом метода complete.

Реализация агента Producer`а (или reader`а в данном случае) весьма тривиальна:
class reader_agent : public Concurrency::agent
{
public:
    reader_agent(
            std::wistream &input,
            i_lines_processor &processor)
        : input_(input), processor_(processor) { }

    virtual ~reader_agent()
    {
        Concurrency::agent::wait(this);
    }

protected:
    void run()
    {
        std::wstring line;
        do 
        {
            wchar_t ch = L'\0';
            input_.get(ch);

            if (ch == L'\n' || input_.eof()) {
                processor_.submit_line(line);
                line.clear();
            } else if (input_.good()) {
                line += ch;
            }
        } while (input_ != 0);

        processor_.complete();
        done();
    }

private:
    std::wistream &input_;
    i_lines_processor &processor_;
};

Самым интересным в этом коде является наследование от класса Concurrency::agent и реализация метода run. Это и есть простейшая реализация асинхронного агента с помощью AAL. Как несложно догадаться метод run как раз и содержит все вычисления, которые будут производиться асинхронно.

Теперь можно реализовать Consumer`а данных, в данном примере его я назвал просто counter`ом:
class counter_agent :
    public i_lines_processor,
    public Concurrency::agent
{
public:
    virtual ~counter_agent()
    {
        Concurrency::agent::wait(this);
    }

    virtual void submit_line(const std::wstring &line)
    {
        // send a new line into the agent's body
        Concurrency::send(lines_, line + L"\n");
    }

    virtual void complete()
    {
        // 3 line feeds will indicate the end of file
        Concurrency::send(lines_, std::wstring(L"\n\n\n"));
    }

    int get_count()
    {
        // obtain the result
        return Concurrency::receive(count_);
    }

protected:
    void run()
    {
        std::wstring line;
        int count = 0;

        // obtain the next line
        while ((line = Concurrency::receive(lines_))
            != L"\n\n\n")
        {
            int word_size = 0;
            std::for_each(
                line.begin(), line.end(),
                [&] (wchar_t ch) {

               int code = std::char_traits<
                    wchar_t>::to_int_type(ch);
                if (iswalnum(code)) {
                    word_size++;
                } else {
                    if (word_size != 0) {
                        count++;
                        word_size = 0;
                    }
                }
            });
        }

        // send result to reader
        Concurrency::send(count_, count);
        done();
    }

private:
    // received lines
    Concurrency::unbounded_buffer<std::wstring> lines_;

    // result
    Concurrency::single_assignment<int> count_;
};

В данном классе следует обратить внимание на операции по синхронизации данных в поток агента и возврату результата из него: функции Concurrency::send и Concurrency::receive, а также классы инкапсулирующие данные – Concurrency::unbounded_buffer и Concurrency::single_assignment.

Осталось добавить необходимые заголовки:
#include <agents.h>

#include <algorithm>
#include <cwchar>
#include <fstream>
#include <iostream>
#include <string>

И добавить код запускающий агентов:
int _tmain(int argc, _TCHAR* argv[])
{
    std::wifstream data;
    data.exceptions(std::ios::badbit);
    data.open("bigtest.txt");

    counter_agent counter;
    reader_agent reader(data, counter);

    counter.start();
    reader.start();

    std::wcout << counter.get_count() << std::endl;

    return 0;
}

Если вы теперь скомпилируете этот код и запустите на процессоре с несколькими ядрами, вы сможете наблюдать, как ваше приложение загрузит сразу 2 ядра на 100%.

5 комментариев:

Анонимный комментирует...

А если 4 ядра или 64?

Алексей Коротаев комментирует...

Ну если я не ошибаюсь, то Windows не поддерживает 64 ядра вообще (по крайней мере 32-ух разрядная), максимум 32 ;-). Но если вынести за скобки это ограничение, то можно создавать, например, по одному counter-агенту на каждую строку (AAL имеет внутренний пул и сама прекрасно позаботится о правильно выделении потоков), тогда удасться загрузить все ядра. Я не стал излишне усложнять пример.

Опять же, если приложение само по себе содержит много разных агентов (много задач, выполняемых одновременно), то все ядра загрузятся сами по себе без специальных ухищрений. Просто конкретно в данном случае задачи ровно 2 :-). А, например, в случае компьютерной игры (;-)) хороший кандидат на то, чтобы быть агентом - это любой NPC :-).

Главный посыл в том, что асинхронные агенты позволяют достаточно легко реализовать многопоточное масштабируемое приложение и организовать хранение и передачу состояния между потоками. При этом вы вообще не заморачиваетесь с многопоточностью как таковой.

Анонимный комментирует...

Вот на этом примере как раз выглядит достаточно сложно в плане масштабируемости той же самой задачи.

Обобщенных средств нет для реализации параллелизма, все делается от конкретной задачи. Агенты красиво выглядят на синтетических задачах с точки зрения ООА/ООП, но имеют недостатки.

Как нетрудно увидеть, функциональный подход (PLINQ) выглядит более естественно, оперирует абстракциями высокого уровня и самое главное он масштабируемый, даже на такой синтетической задаче выше:

var lines = File.ReadLines("bigtest.txt").AsParallel();
var counts = lines.SelectMany(line => line.Split(new char[] { ' ' }))
.GroupBy(word => word)
.SelectMany(group => new[] { new KeyValuePair(group.Key, group.Count()) });

Данный код масштабируется на большее кол-во ядер/процессоров чем два. Опять же - это решение практически в лоб, там можно еще всякого навернуть.

Еще есть трудности отладки агентов (когда их много).

Алексей Коротаев комментирует...

Как вы правильно заметили, любое средство параллелизма имеет свои недостатки и достоинства. И функциональный подход совсем не панацея. Если говорить именно за эффективную масштабируемость данного примера, то тут вообще всё легко решается с помощью одного цикла и старого доброго task-based подхода (см. предыдущий пост). А пример с LINQ такой же синтетический в данном случае, как и мой, потому что он именно это и делает - обходит циклом файл и на каждую строку ставит задачу.

Мне нужен был достаточно наглядный и простой пример, для демонстрации того, что же такое асинхронные агенты и как они могут взаимодействовать. Я не случайно в самом начале подчеркнул, что главная выгода агентов именно во взаимодействии друг с другом, реализовать которое гораздо проще, как ни крути (вот как вы сделаете многопоточный обсчёт персонажей компьютерной игры в функциональном стиле или с помощью LINQ, особенно если они взаимодействуют друг с другом). Если же делать агенты достаточно атомарными, реализующими некоторую простую задачу, то вы получите и масштабируемость просто за счёт того, что их много.

Священную войну про отладку лучше вообще не начинать ;-), ибо просто отлаживать только однопоточное приложение. Но у агентов в этом плане есть один неоспаримый плюс: за счёт своей атомарности и замкнутости они легко покрываются unit-тестами. Опять же отладить один агент в однопоточном окружении в контексте тестового приложения можно и сделать это достаточно просто.

Алексей Коротаев комментирует...

А вообще у меня предложение. Если у вас есть, что сказать интересного на эту тему, давайте вы где-нибудь напишете и опубликуете свои мысли, а я дам у себя ссылку. Это будет конструктивнее и полезнее, для читающего сообщества, чем вести священные войны на тему кто круче: ООП или функциональщина ;-).

Отправить комментарий