Глобальная поблочная обработка ошибок в конвейере потока данных

Я разрабатываю долгосрочный конвейер потока данных, состоящий из нескольких блоков. Элементы подаются во входной блок конвейера, в конечном итоге проходят через него и отображаются в пользовательском интерфейсе в конце (в знак любезности для пользователя - настоящая задача конвейера - сохранять результаты обработки на диск).

Лямбда-функции внутри блоков конвейера могут вызывать исключения по разным причинам (неправильный ввод, сбой сети, ошибка во время вычислений и т. Д.). В этом случае, вместо того, чтобы отказываться от всего конвейера, я хотел бы исключить проблемный элемент и отобразить его в пользовательском интерфейсе в разделе «Ошибки».

Как лучше всего это сделать? Я понимаю, что могу обернуть каждую лямбда-функцию в try / catch:

var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...)

var workerBlock = new TransformBlock<WorkItem, WorkItem>(item => 
{
    try {
        return DoStuff(item);
    } catch (Exception ex) {
        errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
        return null;
    }
}

Но у меня в конвейере около 10 блоков, и копировать / вставлять этот код в каждый кажется глупым. Кроме того, мне не нравится идея возврата null, так как теперь все нижележащие блоки должны будут его проверять.

Моя следующая лучшая идея - создать функцию, которая возвращает лямбду, которая будет обертывать за меня:

  private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem
  {
     return arg =>
     {
        try {
           return f(arg);
        } catch (Exception ex) {
           errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
           return default(TResult);
        }
     };
  }

Но это кажется слишком мета. Есть ли способ лучше ?


person Bugmaster    schedule 12.08.2015    source источник
comment
Вы можете взглянуть на минималистичную Try библиотеку Стивена Клири. Он позволяет передавать сообщение через все блоки конвейера, а затем наблюдать за любыми исключениями, которые произошли с этим сообщением в конце.   -  person Theodor Zoulias    schedule 28.08.2020


Ответы (1)


Это очень интересная тема.

Вы можете определить фильтры при связывании блоков, что означает, что вы можете перенаправить результаты ошибок на блоки обработки ошибок. Для этого блоки должны возвращать «мета» объекты, которые содержат как результаты их обработки, так и, по крайней мере, индикатор неудачи / успеха.

Эту идею лучше описать в Железнодорожно-ориентированном программировании, где каждая функция в цепочке обрабатывает успешные результаты или перенаправляет неудачные результаты на «неудачный трек» для возможной регистрации.

На практике это означает, что вы должны добавить две ссылки после каждого блока: одну с условием фильтрации, которое перенаправляет на блок обработки ошибок, и одну ссылку по умолчанию, которая переходит к следующему шагу в потоке.

Вы даже можете объединить эти две идеи для устранения частичных отказов. Результат частичного отказа будет содержать как индикатор отказа, так и полезную нагрузку. Вы можете перенаправить результат в блок регистрации, прежде чем передавать его следующему шагу.

Я обнаружил, что намного проще четко указывать статус каждого сообщения, чем пытаться определить его статус, проверяя нулевые, отсутствующие значения и т. Д. Это означает, что блоки должны заключать свои результаты в "конверт" «объекты, содержащие флаги состояния, результаты и / или любые ошибки.

person Panagiotis Kanavos    schedule 12.08.2015
comment
Спасибо, я думаю, что понимаю концепцию, но это не отвечает на мой вопрос. Как я могу автоматически гарантировать, что все мои блоки выдают элемент, сигнализирующий об ошибке, всякий раз, когда их функции обработки вызывают ошибки? Как я уже сказал в вопросе, я могу вручную обернуть их все в try / catch, но должен быть способ получше ... - person Bugmaster; 13.08.2015