Akka Streams (Scala): фильтрация исключений

Одним из шагов в моем конвейере akka streams является преобразование, которое выдает исключение при получении недопустимого ввода. Я хотел бы отказаться от этих проблемных входных данных. Итак, я придумал следующее решение:

...
.map( input => Try( transformation( input ) ).toOption )
.filter( _.nonEmpty )
.map( _.get )
...

Что занимает 3 шага для того, что на самом деле является просто плоской картой.

Есть ли более простой способ akka сделать это?


person david    schedule 18.05.2021    source источник


Ответы (2)


Вы можете использовать стратегии супервизии. Взято из документа:

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _                      => Supervision.Stop
}

val flow = Flow[Int]
  .filter(100 / _ < 50)
  .map(elem => 100 / (5 - elem))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))

Вы можете настроить Decider для выполнения всего, что вам нужно. Если вам нужно пропустить этот элемент для всех исключений, используйте

 case _: Throwable => Supervision.Resume

Взгляните на https://doc.akka.io/docs/akka/current/stream/stream-error.html

person Emiliano Martinez    schedule 18.05.2021

Если вы хотите автоматически отбрасывать исключения, как указано в примере кода, вот несколько способов сократить количество шагов:

// A dummy transformation
def transformation(i: Int): Int = 100 / i

// #1: Use `collect`
Source(List(5, 2, 0, 1)).
  map(input => Try(transformation(input)).toOption).
  collect{ case x if x.nonEmpty => x.get }.
  runForeach(println)
  // Result: 20, 50, 100

// #2: Use `mapConcat`
Source(List(5, 2, 0, 1)).
  mapConcat(input => List(Try(transformation(input)).toOption).flatten).
  runForeach(println)
  // Result: 20, 50, 100

Обратите внимание, что для Akka Source/Flow нет flatMap, хотя mapConcatflatMapConcat) работает примерно так же.

person Leo C    schedule 18.05.2021