вторник, 26 февраля 2013 г.

Вернёмся к Data Flow, теперь на C#

Некоторое количество заметок назад я рассказывал о шаблоне многозадачного программирования Data Flow и демонстрировал пример его реализации на C++ с помощью библиотеки Asynchronous Agents Library (AAL). Пришла пора ещё раз вспомнить об этом шаблоне: дело в том, что с недавних пор его реализация стала доступной для платформы .NET.

Задача
Вспомним прошлую задачу и постараемся переписать её на C#. Нужно написать простейший командный интерпретатор, который принимает текстовый поток команд, разделённых символом перевода строк, распознаёт каждую команду, выполняет её и отправляет результат выполнения на консоль или в выходной файл. При этом должны выполняться 3 условия:
  • интерпретатор должен быть многозадачным
  • ответы должны поступать на выход в том же порядке, в котором команды поступали на вход
  • мы не должны использовать синхронизацию данных на выходе, чтобы устранить Lock Convoy.
Чтобы решить поставленную задачу со всеми условиями, мы можем использовать шаблон Data Flow или Конвейер Данных. Каждая команда будет проходить через четыре последовательных узла конвейера. Каждый узел выполняет одну строго закреплённую за ним операцию: считать данные, распознать команду, выполнить команду и записать ответ.


Решение
Все необходимые классы, реализующие шаблон Data Flow, располагаются в пространстве имён System.Threading.Tasks.Dataflow и являются частью библиотеки TPL Dataflow Library. Но не трудитесь искать сборку с этой библиотекой в числе стандартных модулей .NET. Её там нет. Она распространяется в виде отдельного NuGet пакета. Поэтому, прежде чем приступать к экспериментам, откройте в вашем проекте консоль менеджера пакетов NuGet и выполните в нём вот такую команду:
Install-Package Microsoft.Tpl.Dataflow
Эта команда скачает все необходимые сборки из репозитория NuGet пакетов и добавит их к вашему проекту.

Как и в прошлый раз начнём с самих команд и определим интерфейс и несколько реализаций – GetTime, Echo и BadCommand для внутреннего использования:
internal interface ICommand
{
  string Execute();

  string Arguments { get; set; }
}

// Usage: gettime [gmt]
internal class GetTime : ICommand
{
  // ...
}

// Usage: echo <text>
internal class Echo : ICommand
{
  // ...
}

// Representation of a bad command
internal class BadCommand : ICommand
{
  // ...
}

// ...

// Fill list of supported commands

Dictionary<string, Type> commands = new Dictionary<string, Type>();
commands.Add(GetTime.Name, typeof(GetTime));
commands.Add(Echo.Name, typeof(Echo));
Детали реализаций самих команд не так интересны, поэтому тут я их опустил, но вы можете найти их в полном исходном тексте проекта по ссылке ниже.

Теперь пришло время заняться самим конвейером. Концепция и структура библиотеки Tasks.Dataflow очень похожа на то, что мы уже видели в Asynchronous Agents Library (AAL), даже названия некоторых элементов в обеих библиотеках созвучны. В случае AAL мы строили конвейер из блоков сообщений (message blocks), теперь же мы будем оперировать блоками потоков данных (dataflow blocks). Описание всех доступных блоков потоков данных можно найти в официальной документации, для наших же целей нам понадобятся два типа блоков: TransformBlock<TInput, TOutput> и ActionBlock<TInput>. Первый тип блоков предназначен для преобразования данных: с помощью делегата, переданного экземпляру блока при его создании, он трансформирует каждый принимаемый элемент данных из типа TInput в тип TOutput. С помощью этого класса мы реализуем блок распознавания команды (parser) и блок исполнения команды (executor):
// create "parser" block
TransformBlock
<string, ICommand> parser =
  new TransformBlock<string, ICommand>(
  line =>
  {
    if (string.IsNullOrEmpty(line))
    {
      ICommand bad = new BadCommand();
      bad.Arguments = "<EMPTY>";
      return bad;
    }

    string [] tokens = line.Split(
      new char[] { ' ', '\t' }, 2);

    Type commandType;
    if (!commands.TryGetValue(
      tokens[0], out commandType))
    {
      ICommand bad = new BadCommand();
      bad.Arguments = line;
      return bad;
    }

    ICommand command = (ICommand)
      Activator.CreateInstance(commandType);
    command.Arguments =
      tokens.Length == 2 ? tokens[1] : null;

    return command;
  }, new ExecutionDataflowBlockOptions {
    CancellationToken = toCancel.Token });


// create "executor" block
TransformBlock<ICommand, string> executor =
  new TransformBlock<ICommand, string>(
  command => command.Execute(),
  new ExecutionDataflowBlockOptions {
    CancellationToken = toCancel.Token });

Второй тип блоков гораздо проще: для каждого принимаемого элемента данных он вызывает делегат, переданный экземпляру блока при его создании. Этот класс мы используем для реализации блока вывода результата (printer):
// create "printer" block
ActionBlock<string> printer =
  new ActionBlock<string>(
  result => System.Console.WriteLine(result),
  new ExecutionDataflowBlockOptions {
    CancellationToken = toCancel.Token });

Теперь можно связать все блоки в один конвейер:
// link blocks together
parser.LinkTo(executor);
executor.LinkTo(printer);

И добавить условия остановки:
// prepare the cancelation
CancellationTokenSource toCancel = new CancellationTokenSource();

// ...

parser.Completion.ContinueWith(
  finishedTask =>
  {
    if (finishedTask.IsFaulted)
    {
      // pass the fault reason to the next block
      ((IDataflowBlock)executor).Fault(
        finishedTask.Exception);
    }
    else
    {
      // tranfer "complete" command to the next block
      executor.Complete();
    }
  });

executor.Completion.ContinueWith(
  finishedTask =>
  {
    if (finishedTask.IsFaulted)
    {
      // pass the fault reason to the next block
      ((IDataflowBlock)printer).Fault(
        finishedTask.Exception);
    }
    else
    {
      // tranfer "complete" command to the next block
      printer.Complete();
    }
  });

Task printerFinish = printer.Completion.ContinueWith(
  finishedTask =>
  {
    if (finishedTask.IsFaulted)
    {
      // cancel all in case of error
      toCancel.Cancel();
    }
  });

Обратите внимание, как блоки передают друг другу либо ошибку, либо сигнал остановки. В случае передачи блоку ошибки он тут же прекращает любую активность и завершается. При этом все вызовы Wait функции для задачи (Task), возвращённой через свойство Completion данного блока, завершаться выбросом исключения. Вызов метода Complete, в отличие от передачи ошибки, не приводит к моментальной остановке блока. Он сигнализирует, что поступление данных больше не ожидается, – блок должен обработать все переданные ему элементы данных до конца и завершиться. Все потоки, ожидающие завершения блока с помощью вызова block.Completion.Wait() завершат ожидание и продолжат своё исполнение.

Исключением является лишь последний блок в конвейере: в случае ошибки он возбуждает сигнал отмены с помощью canceletion token`а. Вы могли обратить внимание, что он использовался и для инициализации всех блоков в конвейере. Таким нехитрым способом мы сможем остановить весь конвейер целиком, не зависимо от того, на каком именно этапе произошла ошибка.

Осталось только открыть файл с командами и запустить конвейер:
using(StreamReader input = File.OpenText(fileName))
{
  CancellationToken cancel = toCancel.Token;

  // read lines
  for (string line = input.ReadLine();
    line != null && !cancel.IsCancellationRequested;
    line = input.ReadLine())
  {
    // transfer next line to the parser
    parser.Post(line);
  }

  // mark block as completed
  parser.Complete();
}

// wait both: printer block and its continuation
Task.WaitAll(printer.Completion, printerFinish);

Обратите внимание на последний нюанс на сегодня. Последней строчкой идёт ожидание завершения конвейера, при этом жду я сразу 2 задачи: первая – задача, связанная с последним блоком конвейера, вторая – её continuation. Собственно признаком завершения конвейера является завершение второй задачи, но если мы будем ждать только её, мы рискуем не получить исключения, произошедшие в самом конвейере. Чтобы этого не произошло, мы должны либо явно проверять их наличие через свойство printer.Completion.Exception, либо добавить задачу, связанную с последним блоком конвейера к списку ожидания – в этом случае исключение автоматически пробросится в контекст потока.

Ну и как водится, ссылка на исходный код всего проекта здесь (но на этот раз для Visual Studio 2012).

2 комментария:

Алексей комментирует...

Задача слишком примитивна для того, чтобы использовать TPL Dataflow Library: граф потоков данных не изменяется, состоит из 4х узлов, каждый узел имеет только один вход. Проще и эффективнее это решается с помощью 4х потоков (threads), соединенных очередями. Было бы интереснее посмотреть действительно сложную задачу, с динамически создаваемыми узлами, некоторые с несколькими входами (представлены с помощью JoinBlock).

Igor комментирует...

по следам двух последних постов, примерчик бы как Data Flow из poweshell вызывать, причем спрятать все обертки от того, кто реализует блоки и flow.
Этот кусок кода ContinueWith уж слишком громоздко выглядит. его бы как то спрятать

Отправить комментарий