среда, 18 января 2012 г.

MapReduce и Erlang – лучше когда вместе

Пришла пора добавить немного кода к рассказанной в прошлый раз теории. Вот только идея написать MapReduce на C++ или C# мне показалась слишком банальной и тривиальной. Захотелось немного экзотики. Если уж реализовывать "супер распределённый" алгоритм, то только на языке, который считается одним из наиболее приспособленных к параллельным вычислениям. Я говорю об Erlang`е.

Вот такой вот странный зверь – Erlang.
Erlang – это функциональный язык, специально заточенный для нужд многопоточной разработки. По праву одним из самых лучшим инструментов для параллельного программирования его делают ряд особенностей, перечисленных ниже.

1. Встроенные в язык средства управления потоками – в Erlang`е они называются легковесными процессами – выполняют за вас такие рутинные операции, как: организация пулов, распределение задач между физическими потоками операционной системы, масштабирование на несколько ядер и даже физических машин. Чтобы создать новый процесс достаточно вызвать встроенную функцию spawn следующим образом:
Pid = spawn(module, function, ArgsList)
А чтобы поток создался на другом физическом хосте, где запущена виртуальная машина Erlang`а и которой доступен ваш модуль module с функцией function, достаточно лишь слегка модифицировать изначальный код:
Pid = spawn(instance@HOST, module, function, ArgsList)
Где instance – это имя экземпляра виртуальной машины Erlang`а, запущенной на физическом хосте с именем HOST.

2. Поддержка на уровне языка механизма обмена асинхронными сообщениями позволяет вам не думать об организации очереди сообщений, решении проблемы гарантированности доставки, блокировках и так далее. Для передачи сообщений в синтаксисе языка выделен специальный оператор – '!'. Следующая строчка кода отправляет сообщение Message потоку с идентификатором Pid:
Pid ! Message
Забавно, что в качестве сообщения могут выступать данные любого типа.

Для приёма сообщений служит ключевое слово receive. Например, в следующем примере демонстрируется приём сообщений 2-ух типов – атома 'stop' (что такое атом я расскажу чуть ниже) и кортежа, состоящего из атома 'work' и произвольных данных:
receive
    stop ->
        process_stop();
    {work, X} ->
        process_work(X),
        consumer_impl()
end;

3. Приложение на Erlang`е оперирует только неизменяемыми (immutable) переменными. Это означает, что значение, присвоенное переменной единожды, не может быть изменено впоследствии. Это ограничение позволяет избежать таких проблем, как: side-эффекты, конкуренция за ресурсы и блокировки.

4. Виртуальная машина Erlang`а и его сборщик мусора были изначально спроектированы с прицелом на многопоточную разработку и системы мягкого реального времени. Более подробно про эти особенности можно прочитать здесь.

Ещё несколько деталей синтаксиса языка, чтобы дальнейший код был более понятен. Любая переменная должна начинаться с большой буквы:
Operation = work
Значение в правой части выражения – это атом, специальный тип данных в Erlang`е, представляющий собой просто имя. В отличии от переменных атомы начинаются с маленькой буквы: done, stop, work.

Функция определяется следующим образом:
process_work(X) –>
    io:format("Do work: ~p~n", [X]),
    done.
Данные, записанные в последней строке, считаются неявно возвращаемыми из функции. В данном примере – это атом 'done'.

Комментарии помечаются символом % в начале строки. Более подробную информацию о языке можно найти на сайте разработчиков. На RSDN.RU есть неплохой перевод на русский язык статьи "Getting Started with Erlang User's Guide" из каталога официальной документации: "Начала работы с Erlang".

MapReduce в 50 строк
Итак, вернёмся к MapReduce и попытаемся с его помощью посчитать количество различных слов в романе "Война и мир".

Для начала определим функцию мастера:
% Master takes Input and 2 lambdas: Map-function and Reduce-function
master(Input, Map, Reduce) ->
    % run map
    MapCount = run_map(Input, Map),
    % collect map results and sort them using a lambda for comparison
    MapResults = lists:sort(
        fun({LeftKey, _}, {RightKey, _}) -> LeftKey =< RightKey end,
        wait_results(MapCount, [])),

    % run reduce
    ReduceCount = run_reduce(MapResults, Reduce),
    % collect results
    wait_results(ReduceCount, []).
Функция мастера принимает 3 параметра: список обрабатываемых элементов и 2 лямбда-функции. Первая лямбда-функция должна реализовывать процедуру Map, вторая – Reduce. Вместо того, чтобы непрерывно группировать результат, получаемый от Map-агентов, мастер складывает все ответы в единый список (формируется внутри функции wait_results), который сортирует по ключу перед вызовом фазы Reduce.

Функция run_map достаточно проста для понимания и не требует дополнительных комментариев:
run_map(Input, Fun) ->
    % for each elemt in list call a lambda which
    % calls spawn_worker function

    lists:foreach(
        fun(Element) -> spawn_worker(Element, Fun) end, Input),
    length(Input).

Гораздо больший интерес представляет собой функция run_reduce. Если быть точным, то в приложении определено 2 функции run_reduce – 2-я и 3-я аргументами. Причём вторая функция имеет несколько специализаций для конкретных значений передаваемых аргументов:
run_reduce([{Key, Value} | SortedList], Fun) ->
    run_reduce(Key, [Value], SortedList, Fun, 0).

% This set of run_reduce functions groups the received
% sorted list by Key and calls spawn_worker for each
% pair {Key, ValuesList}

run_reduce(Key, ValueList,
        [{Key, Value} | SortedList], Fun, Count) ->
    run_reduce(Key, [Value | ValueList],
        SortedList, Fun, Count);

run_reduce(Key, ValueList,
        [{NewKey, Value} | SortedList], Fun, Count) ->
    spawn_worker({Key, ValueList}, Fun),
    run_reduce(NewKey, [Value], SortedList, Fun, Count + 1);

run_reduce(Key, ValueList, [], Fun, Count) ->
    spawn_worker({Key, ValueList}, Fun),
    Count + 1.
Данный набор функций перебирает список пар, отсортированных по первому значению пары – ключу, строит по данному ключу группировку и для каждой новой пары вида {Ключ, Список Значений} вызывает функцию spawn_work.

Функция spawn_work вызывается и из функции run_map, и из функции run_reduce:
% Function which schedules a worker process.
% It receives an Element which should be processed and
% a worker function (Map or Reduce)

spawn_worker(Element, Fun) ->
    % get current PID
    CurrentPID = self(),

    % define 2 lambdas: Emit and Worker
    Emit =
        fun(Key, Val) ->
            CurrentPID ! {Key, Val}
        end,

    Worker =
        fun() ->
            % Worker calls the received function and
            % pass Element and Emit as an arguments

            Fun(Element, Emit),
            % notify parent that worker is done
            CurrentPID ! worker_done
        end,

    % schedule the worker process
    spawn(fun() -> Worker() end).
Она подготавливает 2 лямбда-функции: Emit и Worker, которые использует для запуска нового процесса (лямбда Worker) и передачи в качестве параметра функции-обработчику (лямбда Emit). Эти лямбда-функции нужны для формирования корректных сообщений мастеру. Функция-обработчик (Map или Reduce) будет просто вызывать лямбду Emit, когда у неё будут готовы данные для отправки мастеру.

В случае обоих фаз мастер собирает результаты с помощью функции wait_results. Эта функция слушает сообщения от агентов и складывает ответы в возвращаемый список:
% This instance of wait_results is called when
% the 1st argument is 0

wait_results(0, Results) ->
    Results;

% This instance of wait_results is called when
% the 1st element is not 0

wait_results(Count, Results) –>
    % wait for messages
    receive
        {Key, Val} –>
            % extend Results list with the received pair
            % and call wait_results recursively

            wait_results(Count, [{Key, Val} | Results]);
        worker_done –>
            % some worker is done: decrease count of
            % worker and call wait_results recursively

            wait_results(Count - 1, Results)
    end.

И наконец функции read_words и count_words, выступающие в качестве рабочих функций Map и Reduce агентов соответственно:
read_words(FileName, Emit) ->
    {ok, FileContent} = file:read_file(FileName),
    Words = string:tokens(
        erlang:binary_to_list(FileContent),
        " \t\n\r,.;:-!?\"'()"),
    lists:foreach(
        fun(Word) ->
            if
                Word /= "" -> Emit(Word, 1);
                Word == "" -> false
            end
        end, Words).

count_words({Word, Counts}, Emit) ->
    Emit(Word, length(Counts)).

Теперь можно вызвать функцию мастер, передав ей список файлов и 2 функции – read_words и count_words:
Results = master(["p1.txt", "p2.txt", "p3.txt"],
    fun read_words/2, fun count_words/2)

Для простоты эксперимента, чтобы не загромождать код лишней работой, связанной с обработкой кодировок, я взял электронную версию английского перевода романа "Война и мир", разбил текст руками на 5 примерно равных частей и запустил вычисление с включенным таймером (функция timer:tc).

Результат весьма впечатляющий:
5> {WPTiming, ok} = timer:tc(mapreduce, main, [["wp1.txt", "wp2.txt", "wp3.txt", "wp4.txt", "wp5.txt"]]).
{4078000,ok}
6> WPTiming / 1000000 .
4.078

На полный подсчёт понадобилось чуть больше 4 секунд на машине с 2-ух ядерным Intel Core 2 Duo. По-моему, это круто.

Архив с полным исходным текстом приложения и тестовыми данными можно скачать здесь.

4 комментария:

redbeard комментирует...

Было бы интересно увидеть аналоги кода на java и c# :)

Алексей Коротаев комментирует...

Возможно C# и Java практичнее с точки зрения применения - чаще используются, но пример будет не таким наглядным. В 50 строк боюсь не управиться. В результате за деревьями леса можно не увидеть. А Erlang как нельзя лучше иллюстрирует саму суть идеи. Основываясь на этом примере можно уже и более конкретную реализацию под себя написать. Если будет время - накидаю что-нибудь, но не обещаю.

Алексей комментирует...

На Java можно уложиться и в менее чем 50 строк, если использовать готовые фреймворки для MapReduce (Hadoop).

Unknown комментирует...

Жаль, что ссылка на полный исходный текст уже не рабочая. Очень хотелось бы посмотреть!

Отправить комментарий