Пришла пора добавить немного кода к рассказанной в прошлый раз теории. Вот только идея написать MapReduce на C++ или C# мне показалась слишком банальной и тривиальной. Захотелось немного экзотики. Если уж реализовывать "супер распределённый" алгоритм, то только на языке, который считается одним из наиболее приспособленных к параллельным вычислениям. Я говорю об Erlang`е.
Вот такой вот странный зверь – Erlang.
Erlang – это функциональный язык, специально заточенный для нужд многопоточной разработки. По праву одним из самых лучшим инструментов для параллельного программирования его делают ряд особенностей, перечисленных ниже.
1. Встроенные в язык средства управления потоками – в Erlang`е они называются легковесными процессами – выполняют за вас такие рутинные операции, как: организация пулов, распределение задач между физическими потоками операционной системы, масштабирование на несколько ядер и даже физических машин. Чтобы создать новый процесс достаточно вызвать встроенную функцию spawn следующим образом:
2. Поддержка на уровне языка механизма обмена асинхронными сообщениями позволяет вам не думать об организации очереди сообщений, решении проблемы гарантированности доставки, блокировках и так далее. Для передачи сообщений в синтаксисе языка выделен специальный оператор – '!'. Следующая строчка кода отправляет сообщение Message потоку с идентификатором Pid:
Для приёма сообщений служит ключевое слово receive. Например, в следующем примере демонстрируется приём сообщений 2-ух типов – атома 'stop' (что такое атом я расскажу чуть ниже) и кортежа, состоящего из атома 'work' и произвольных данных:
3. Приложение на Erlang`е оперирует только неизменяемыми (immutable) переменными. Это означает, что значение, присвоенное переменной единожды, не может быть изменено впоследствии. Это ограничение позволяет избежать таких проблем, как: side-эффекты, конкуренция за ресурсы и блокировки.
4. Виртуальная машина Erlang`а и его сборщик мусора были изначально спроектированы с прицелом на многопоточную разработку и системы мягкого реального времени. Более подробно про эти особенности можно прочитать здесь.
Ещё несколько деталей синтаксиса языка, чтобы дальнейший код был более понятен. Любая переменная должна начинаться с большой буквы:
Функция определяется следующим образом:
Комментарии помечаются символом % в начале строки. Более подробную информацию о языке можно найти на сайте разработчиков. На RSDN.RU есть неплохой перевод на русский язык статьи "Getting Started with Erlang User's Guide" из каталога официальной документации: "Начала работы с Erlang".
MapReduce в 50 строк
Итак, вернёмся к MapReduce и попытаемся с его помощью посчитать количество различных слов в романе "Война и мир".
Для начала определим функцию мастера:
Функция run_map достаточно проста для понимания и не требует дополнительных комментариев:
Гораздо больший интерес представляет собой функция run_reduce. Если быть точным, то в приложении определено 2 функции run_reduce – 2-я и 3-я аргументами. Причём вторая функция имеет несколько специализаций для конкретных значений передаваемых аргументов:
Функция spawn_work вызывается и из функции run_map, и из функции run_reduce:
В случае обоих фаз мастер собирает результаты с помощью функции wait_results. Эта функция слушает сообщения от агентов и складывает ответы в возвращаемый список:
И наконец функции read_words и count_words, выступающие в качестве рабочих функций Map и Reduce агентов соответственно:
Теперь можно вызвать функцию мастер, передав ей список файлов и 2 функции – read_words и count_words:
Для простоты эксперимента, чтобы не загромождать код лишней работой, связанной с обработкой кодировок, я взял электронную версию английского перевода романа "Война и мир", разбил текст руками на 5 примерно равных частей и запустил вычисление с включенным таймером (функция timer:tc).
Результат весьма впечатляющий:
На полный подсчёт понадобилось чуть больше 4 секунд на машине с 2-ух ядерным Intel Core 2 Duo. По-моему, это круто.
Архив с полным исходным текстом приложения и тестовыми данными можно скачать здесь.
Вот такой вот странный зверь – Erlang.
Erlang – это функциональный язык, специально заточенный для нужд многопоточной разработки. По праву одним из самых лучшим инструментов для параллельного программирования его делают ряд особенностей, перечисленных ниже.
1. Встроенные в язык средства управления потоками – в Erlang`е они называются легковесными процессами – выполняют за вас такие рутинные операции, как: организация пулов, распределение задач между физическими потоками операционной системы, масштабирование на несколько ядер и даже физических машин. Чтобы создать новый процесс достаточно вызвать встроенную функцию spawn следующим образом:
Pid = spawn(module, function, ArgsList)
А чтобы поток создался на другом физическом хосте, где запущена виртуальная машина Erlang`а и которой доступен ваш модуль module с функцией function, достаточно лишь слегка модифицировать изначальный код:
Pid = spawn(instance@HOST, module, function, ArgsList)
Где instance – это имя экземпляра виртуальной машины Erlang`а, запущенной на физическом хосте с именем HOST.2. Поддержка на уровне языка механизма обмена асинхронными сообщениями позволяет вам не думать об организации очереди сообщений, решении проблемы гарантированности доставки, блокировках и так далее. Для передачи сообщений в синтаксисе языка выделен специальный оператор – '!'. Следующая строчка кода отправляет сообщение Message потоку с идентификатором Pid:
Pid ! Message
Забавно, что в качестве сообщения могут выступать данные любого типа.Для приёма сообщений служит ключевое слово receive. Например, в следующем примере демонстрируется приём сообщений 2-ух типов – атома 'stop' (что такое атом я расскажу чуть ниже) и кортежа, состоящего из атома 'work' и произвольных данных:
receive
stop ->
process_stop();
{work, X} ->
process_work(X),
consumer_impl()
end;
stop ->
process_stop();
{work, X} ->
process_work(X),
consumer_impl()
end;
3. Приложение на Erlang`е оперирует только неизменяемыми (immutable) переменными. Это означает, что значение, присвоенное переменной единожды, не может быть изменено впоследствии. Это ограничение позволяет избежать таких проблем, как: side-эффекты, конкуренция за ресурсы и блокировки.
4. Виртуальная машина Erlang`а и его сборщик мусора были изначально спроектированы с прицелом на многопоточную разработку и системы мягкого реального времени. Более подробно про эти особенности можно прочитать здесь.
Ещё несколько деталей синтаксиса языка, чтобы дальнейший код был более понятен. Любая переменная должна начинаться с большой буквы:
Operation = work
Значение в правой части выражения – это атом, специальный тип данных в Erlang`е, представляющий собой просто имя. В отличии от переменных атомы начинаются с маленькой буквы: done, stop, work.Функция определяется следующим образом:
process_work(X) –>
io:format("Do work: ~p~n", [X]),
done.
Данные, записанные в последней строке, считаются неявно возвращаемыми из функции. В данном примере – это атом 'done'.io:format("Do work: ~p~n", [X]),
done.
Комментарии помечаются символом % в начале строки. Более подробную информацию о языке можно найти на сайте разработчиков. На RSDN.RU есть неплохой перевод на русский язык статьи "Getting Started with Erlang User's Guide" из каталога официальной документации: "Начала работы с Erlang".
MapReduce в 50 строк
Итак, вернёмся к MapReduce и попытаемся с его помощью посчитать количество различных слов в романе "Война и мир".
Для начала определим функцию мастера:
% Master takes Input and 2 lambdas: Map-function and Reduce-function
master(Input, Map, Reduce) ->
% run map
MapCount = run_map(Input, Map),
% collect map results and sort them using a lambda for comparison
MapResults = lists:sort(
fun({LeftKey, _}, {RightKey, _}) -> LeftKey =< RightKey end,
wait_results(MapCount, [])),
% run reduce
ReduceCount = run_reduce(MapResults, Reduce),
% collect results
wait_results(ReduceCount, []).
Функция мастера принимает 3 параметра: список обрабатываемых элементов и 2 лямбда-функции. Первая лямбда-функция должна реализовывать процедуру Map, вторая – Reduce. Вместо того, чтобы непрерывно группировать результат, получаемый от Map-агентов, мастер складывает все ответы в единый список (формируется внутри функции wait_results), который сортирует по ключу перед вызовом фазы Reduce.master(Input, Map, Reduce) ->
% run map
MapCount = run_map(Input, Map),
% collect map results and sort them using a lambda for comparison
MapResults = lists:sort(
fun({LeftKey, _}, {RightKey, _}) -> LeftKey =< RightKey end,
wait_results(MapCount, [])),
% run reduce
ReduceCount = run_reduce(MapResults, Reduce),
% collect results
wait_results(ReduceCount, []).
Функция run_map достаточно проста для понимания и не требует дополнительных комментариев:
run_map(Input, Fun) ->
% for each elemt in list call a lambda which
% calls spawn_worker function
lists:foreach(
fun(Element) -> spawn_worker(Element, Fun) end, Input),
length(Input).
% for each elemt in list call a lambda which
% calls spawn_worker function
lists:foreach(
fun(Element) -> spawn_worker(Element, Fun) end, Input),
length(Input).
Гораздо больший интерес представляет собой функция run_reduce. Если быть точным, то в приложении определено 2 функции run_reduce – 2-я и 3-я аргументами. Причём вторая функция имеет несколько специализаций для конкретных значений передаваемых аргументов:
run_reduce([{Key, Value} | SortedList], Fun) ->
run_reduce(Key, [Value], SortedList, Fun, 0).
% This set of run_reduce functions groups the received
% sorted list by Key and calls spawn_worker for each
% pair {Key, ValuesList}
run_reduce(Key, ValueList,
[{Key, Value} | SortedList], Fun, Count) ->
run_reduce(Key, [Value | ValueList],
SortedList, Fun, Count);
run_reduce(Key, ValueList,
[{NewKey, Value} | SortedList], Fun, Count) ->
spawn_worker({Key, ValueList}, Fun),
run_reduce(NewKey, [Value], SortedList, Fun, Count + 1);
run_reduce(Key, ValueList, [], Fun, Count) ->
spawn_worker({Key, ValueList}, Fun),
Count + 1.
Данный набор функций перебирает список пар, отсортированных по первому значению пары – ключу, строит по данному ключу группировку и для каждой новой пары вида {Ключ, Список Значений} вызывает функцию spawn_work.run_reduce(Key, [Value], SortedList, Fun, 0).
% This set of run_reduce functions groups the received
% sorted list by Key and calls spawn_worker for each
% pair {Key, ValuesList}
run_reduce(Key, ValueList,
[{Key, Value} | SortedList], Fun, Count) ->
run_reduce(Key, [Value | ValueList],
SortedList, Fun, Count);
run_reduce(Key, ValueList,
[{NewKey, Value} | SortedList], Fun, Count) ->
spawn_worker({Key, ValueList}, Fun),
run_reduce(NewKey, [Value], SortedList, Fun, Count + 1);
run_reduce(Key, ValueList, [], Fun, Count) ->
spawn_worker({Key, ValueList}, Fun),
Count + 1.
Функция spawn_work вызывается и из функции run_map, и из функции run_reduce:
% Function which schedules a worker process.
% It receives an Element which should be processed and
% a worker function (Map or Reduce)
spawn_worker(Element, Fun) ->
% get current PID
CurrentPID = self(),
% define 2 lambdas: Emit and Worker
Emit =
fun(Key, Val) ->
CurrentPID ! {Key, Val}
end,
Worker =
fun() ->
% Worker calls the received function and
% pass Element and Emit as an arguments
Fun(Element, Emit),
% notify parent that worker is done
CurrentPID ! worker_done
end,
% schedule the worker process
spawn(fun() -> Worker() end).
Она подготавливает 2 лямбда-функции: Emit и Worker, которые использует для запуска нового процесса (лямбда Worker) и передачи в качестве параметра функции-обработчику (лямбда Emit). Эти лямбда-функции нужны для формирования корректных сообщений мастеру. Функция-обработчик (Map или Reduce) будет просто вызывать лямбду Emit, когда у неё будут готовы данные для отправки мастеру.% It receives an Element which should be processed and
% a worker function (Map or Reduce)
spawn_worker(Element, Fun) ->
% get current PID
CurrentPID = self(),
% define 2 lambdas: Emit and Worker
Emit =
fun(Key, Val) ->
CurrentPID ! {Key, Val}
end,
Worker =
fun() ->
% Worker calls the received function and
% pass Element and Emit as an arguments
Fun(Element, Emit),
% notify parent that worker is done
CurrentPID ! worker_done
end,
% schedule the worker process
spawn(fun() -> Worker() end).
В случае обоих фаз мастер собирает результаты с помощью функции wait_results. Эта функция слушает сообщения от агентов и складывает ответы в возвращаемый список:
% This instance of wait_results is called when
% the 1st argument is 0
wait_results(0, Results) ->
Results;
% This instance of wait_results is called when
% the 1st element is not 0
wait_results(Count, Results) –>
% wait for messages
receive
{Key, Val} –>
% extend Results list with the received pair
% and call wait_results recursively
wait_results(Count, [{Key, Val} | Results]);
worker_done –>
% some worker is done: decrease count of
% worker and call wait_results recursively
wait_results(Count - 1, Results)
end.
% the 1st argument is 0
wait_results(0, Results) ->
Results;
% This instance of wait_results is called when
% the 1st element is not 0
wait_results(Count, Results) –>
% wait for messages
receive
{Key, Val} –>
% extend Results list with the received pair
% and call wait_results recursively
wait_results(Count, [{Key, Val} | Results]);
worker_done –>
% some worker is done: decrease count of
% worker and call wait_results recursively
wait_results(Count - 1, Results)
end.
И наконец функции read_words и count_words, выступающие в качестве рабочих функций Map и Reduce агентов соответственно:
read_words(FileName, Emit) ->
{ok, FileContent} = file:read_file(FileName),
Words = string:tokens(
erlang:binary_to_list(FileContent),
" \t\n\r,.;:-!?\"'()"),
lists:foreach(
fun(Word) ->
if
Word /= "" -> Emit(Word, 1);
Word == "" -> false
end
end, Words).
count_words({Word, Counts}, Emit) ->
Emit(Word, length(Counts)).
{ok, FileContent} = file:read_file(FileName),
Words = string:tokens(
erlang:binary_to_list(FileContent),
" \t\n\r,.;:-!?\"'()"),
lists:foreach(
fun(Word) ->
if
Word /= "" -> Emit(Word, 1);
Word == "" -> false
end
end, Words).
count_words({Word, Counts}, Emit) ->
Emit(Word, length(Counts)).
Теперь можно вызвать функцию мастер, передав ей список файлов и 2 функции – read_words и count_words:
Results = master(["p1.txt", "p2.txt", "p3.txt"],
fun read_words/2, fun count_words/2)
fun read_words/2, fun count_words/2)
Для простоты эксперимента, чтобы не загромождать код лишней работой, связанной с обработкой кодировок, я взял электронную версию английского перевода романа "Война и мир", разбил текст руками на 5 примерно равных частей и запустил вычисление с включенным таймером (функция timer:tc).
Результат весьма впечатляющий:
5> {WPTiming, ok} = timer:tc(mapreduce, main, [["wp1.txt", "wp2.txt", "wp3.txt", "wp4.txt", "wp5.txt"]]).
{4078000,ok}
6> WPTiming / 1000000 .
4.078
{4078000,ok}
6> WPTiming / 1000000 .
4.078
На полный подсчёт понадобилось чуть больше 4 секунд на машине с 2-ух ядерным Intel Core 2 Duo. По-моему, это круто.
Архив с полным исходным текстом приложения и тестовыми данными можно скачать здесь.
4 комментария:
Было бы интересно увидеть аналоги кода на java и c# :)
Возможно C# и Java практичнее с точки зрения применения - чаще используются, но пример будет не таким наглядным. В 50 строк боюсь не управиться. В результате за деревьями леса можно не увидеть. А Erlang как нельзя лучше иллюстрирует саму суть идеи. Основываясь на этом примере можно уже и более конкретную реализацию под себя написать. Если будет время - накидаю что-нибудь, но не обещаю.
На Java можно уложиться и в менее чем 50 строк, если использовать готовые фреймворки для MapReduce (Hadoop).
Жаль, что ссылка на полный исходный текст уже не рабочая. Очень хотелось бы посмотреть!
Отправить комментарий