Как быть с источником, излучающим будущее [T]?

Допустим, у меня есть итератор:

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...)


И я хочу создать исходный код из этого итератора:

val source: Source[Future[Int], NotUsed] =
  Source.fromIterator(() => nextElemIter)


Итак, теперь мой источник выдает Futures. Я никогда не видел, чтобы фьючерсы передавались между этапами в документации Akka или где-либо еще, поэтому вместо этого я всегда мог сделать что-то вроде этого:

val source: Source[Int, NotUsed] = 
  Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */)


А теперь у меня есть штатный источник, который выдает T вместо Future[T]. Но это кажется хакерским и неправильным.

Как правильно поступать в таких ситуациях?


person Ori Popowski    schedule 28.09.2016    source источник
comment
Я думаю, что mapAsync здесь прекрасно. Ведь он предназначен как раз для этого - расплющивать фьючерсы в потоки.   -  person Vladimir Matveev    schedule 28.09.2016
comment
mapAsync(1)(identity) - правильный способ сделать это.   -  person expert    schedule 28.09.2016
comment
@expert отредактировал.   -  person Ori Popowski    schedule 29.09.2016
comment
@VladimirMatveev У меня больше сложилось впечатление, что значения, не относящиеся к будущему, должны проходить между этапами, а mapAsync был больше для фьючерсов, которые создаются внутри этапа, а не приходят уже как будущее из извне . Хотя я могу ошибаться. Просто кажется странным, что нет Source конструктора, который сам сглаживает фьючерсы   -  person Ori Popowski    schedule 29.09.2016


Ответы (1)


Отвечая прямо на ваш вопрос: я согласен с комментарием Владимира о том, что нет ничего "хакерского" в использовании mapAsync для описанной вами цели. Я не могу придумать более прямого способа развернуть Future из ваших базовых Int значений.

Отвечая на ваш вопрос косвенно ...

Попробуйте придерживаться фьючерсов

Потоки как механизм параллелизма невероятно полезны, когда требуется противодавление. Однако чистые Future операции также находят свое место в приложениях.

Если ваш Iterator[Future[Int]] собирается производить известное ограниченное количество Future значений, тогда вы можете использовать Futures для параллелизма.

Представьте, что вы хотите отфильтровать, сопоставить и уменьшить значения Int.

def isGoodInt(i : Int) : Boolean = ???         //filter
def transformInt(i : Int) : Int = ???          //map
def combineInts(i : Int, j : Int) : Int = ???  //reduce

Фьючерсы предоставляют прямой способ использования этих функций:

val finalVal : Future[Int] = 
  Future sequence { 
    for {
      f <- nextElemIter.toSeq  //launch all of the Futures
      i <- f if isGoodInt(i)
    } yield transformInt(i)
  } map (_ reduce combineInts)

По сравнению с несколько косвенным способом использования Stream, как вы предложили:

val finalVal : Future[Int] = 
  Source.fromIterator(() => nextElemIter)
        .via(Flow[Future[Int]].mapAsync(1)(identity))
        .via(Flow[Int].filter(isGoodInt))
        .via(Flow[Int].map(transformInt))
        .to(Sink.reduce(combineInts))
        .run()
person Ramón J Romero y Vigil    schedule 28.09.2016