Объединение потоков, когда _любой_ из подпотоков имеет готовое значение

Из документации Akka-stream видно, что все параметры слияния потоков (merge, mergeSorted, mergePreferred, zipN, zipWithN) работают, ожидая, когда все объединенные потоки будут готовы к новому элементу, а затем применяя стратегию слияния (объединение элементов в кортеж, или применяя функцию zip и т. д.)

Это хорошо подходит для автономной обработки (например, чтения данных из файлов или HTTP и их объединения), но приводит к задержке при онлайн-обработке. Мне нужно объединить потоки данных, созданные, например. несколько соединений Websocket и доставлять обновления в объединенном потоке, как только любой из исходных потоков создает значение. Пример: если есть исходные потоки A и B, вот что должно быть в объединенном потоке:

Выходной поток начинается с некоторого начального значения, например. (None, None).

(A:1) (B:<not ready>) -> (Some(1), None)
(A:2) (B:<not ready>) -> (Some(2), None)
(A:3) (B:1)           -> (Some(3), Some(1))
(A:3) (B:2)           -> (Some(3), Some(2))

и т. д. Опять же, новое значение появляется в выходном потоке, когда любой исходный поток создает значение немедленно.

Есть ли какой-нибудь комбинатор для достижения этого?


person Alexander Temerev    schedule 28.01.2017    source источник
comment
› ожидая, когда все объединенные потоки будут готовы к новому элементу - это не похоже на правду. См., например, Объединение страница документации. Он четко определяет, что он излучает, когда один из входных данных имеет доступный элемент (выделено мной).   -  person Vladimir Matveev    schedule 28.01.2017


Ответы (1)


Как указано в комментариях, этапы Merge и MergePreferred действительно испускают элементы вниз по течению, даже если не все восходящие потоки имеют доступный элемент.

Из вашего примера похоже, что вы ищете источники архивирования. И да, Zip выдает заархивированный кортеж вниз по течению только тогда, когда у него есть элементы для заархивирования из всех его восходящих потоков. Чтобы преодолеть это, вы можете «поднять» свои источники, чтобы произвести Options, и заставить их излучать None всякий раз, когда больше нечего излучать. Обертка исходного кода может выглядеть так:

  def asOption[In, Mat](source: Source[In, Mat]): Source[Option[In], Mat] =
    Source.fromGraph(GraphDSL.create(source.map(Option(_))) {
      implicit builder: GraphDSL.Builder[Mat] => src =>
      import GraphDSL.Implicits._

      val noneSource = Source.repeat(None)
      val merge = builder.add(MergePreferred[Option[In]](1))

      src        ~> merge.preferred
      noneSource ~> merge.in(0)

      SourceShape(merge.out)
    })

На этом этапе вы можете заархивировать исходники, как обычно.

  val src1: Source[Int, NotUsed] = ???
  val src2: Source[Int, NotUsed] = ???

  val zipped = asOption(src1) zip asOption(src2)
person Stefano Bonetti    schedule 28.01.2017