Как создать поток Akka с противодавлением и контролем

Мне нужно создать функцию со следующим интерфейсом:

import akka.kafka.scaladsl.Consumer.Control

object ItemConversionFlow {

def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
    // Implementation goes here
}

Моя проблема в том, что я не знаю, как определить поток таким образом, чтобы он соответствовал описанному выше интерфейсу.

Когда я делаю что-то подобное

val flow = Flow[Item]
    .map(item => doConversion(item)
    .filter(_.isDefined)
    .map(_.get)

результирующий тип — Flow[Item, OtherItem, NotUsed]. Я пока ничего не нашел в документации Akka. Кроме того, функции на akka.stream.scaladsl.Flow предлагают только «Не используется» вместо «Управление». Было бы здорово, если бы кто-то мог указать мне правильное направление.

Некоторая предыстория: мне нужно настроить несколько конвейеров, которые различаются только в части преобразования. Эти пайплайны являются подпотоками к основному потоку, который по какой-то причине может быть остановлен (соответствующее сообщение поступает в какую-то тему кафки). Поэтому мне нужна часть управления. Идея состоит в том, чтобы создать шаблон Graph, в который я просто вставлю упомянутый поток в качестве аргумента (фабрика, возвращающая его). Для конкретного случая у нас есть решение, которое работает. Чтобы обобщить, мне нужен такой поток.


person Stoecki    schedule 17.01.2018    source источник
comment
Что такое Контроль и когда он создается? (вы можете создать его в mapMaterializedvalue) или в пользовательской GraphStage, или при использовании GraphDSL   -  person Viktor Klang    schedule 17.01.2018
comment
Я посмотрю. Что касается управления: Control = akka.kafka.scaladsl.Consumer.Control @ Viktor Klang   -  person Stoecki    schedule 17.01.2018


Ответы (1)


У вас действительно есть обратное давление. Однако подумайте о том, что вам действительно нужно в противодавлении... вы не используете асинхронные этапы для увеличения пропускной способности... например. Обратное давление позволяет избежать чрезмерного роста количества подписчиков https://doc.akka.io/docs/akka/2.5/stream/stream-rate.html. В вашем образце не беспокойтесь об этом, ваш поток будет запрашивать новые элементы у издателя в зависимости от того, сколько времени потребуется для завершения doConversion.

Если вы хотите получить результат потока, используйте toMat или viaMat. Например, если ваш поток испускает Item и преобразует их в OtherItem:

val str = Source.fromIterator(() => List(Item(Some(1))).toIterator)
  .map(item => doConversion(item))
  .filter(_.isDefined)
  .map(_.get)
  .toMat(Sink.fold(List[OtherItem]())((a, b) => {
      // Examine the result of your stream
      b :: a
    }))(Keep.right)
  .run()

str будет Future[List[OtherItem]]. Попробуйте экстраполировать это на свой случай.

Или, используя toMat с KillSwitches, «Создает новый [[Graph]] из [[FlowShape]], который материализуется во внешний переключатель, который позволяет внешнее завершение * этой уникальной материализации. Различные материализации приводят к разным, независимым переключениям».

  def build(config: StreamConfig): Flow[Item, OtherItem, UniqueKillSwitch] = {
    Flow[Item]
      .map(item => doConversion(item))
      .filter(_.isDefined)
      .map(_.get)
      .viaMat(KillSwitches.single)(Keep.right)
  }


  val stream = 
    Source.fromIterator(() => List(Item(Some(1))).toIterator)
    .viaMat(build(StreamConfig(1)))(Keep.right)
    .toMat(Sink.ignore)(Keep.both).run

  // This stops the stream
  stream._1.shutdown()

  // When it finishes
  stream._2 onComplete(_ => println("Done"))
person Emiliano Martinez    schedule 17.01.2018
comment
Это не совсем то, что я хочу. Мне действительно нужно что-то, возвращающее мне поток, выполняющий интерфейс, как указано выше. Поэтому мне нужен поток типа Flow[Item, OtherItem, Control]. Ваше решение создаст RunnableGraph. Я отредактировал свой вопрос, чтобы быть более конкретным. Спасибо, пока - person Stoecki; 17.01.2018
comment
проверьте это, если это может помочь - person Emiliano Martinez; 17.01.2018