этап пользовательского графа akka stream

У меня есть поток akka из веб-сокета, например поток akka использует веб-сокет и например, построить повторно используемый этап графика (inlet: поток, FlowShape: добавить дополнительное поле в JSON, указывающее источник, т.е.

{
...,
"origin":"blockchain.info"
}

и outlet к Кафке.

Я столкнулся со следующими 3 проблемами:

  • не могу обернуть голову вокруг создания пользовательского Inlet из потока веб-сокетов
  • невозможно интегрировать kafka напрямую в поток (см. код ниже)
  • не уверен, потребуется ли преобразователь для добавления дополнительного поля для десериализации json для добавления origin

Пример проекта (только поток) выглядит так:

import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val incoming: Sink[Message, Future[Done]] =
    Flow[Message].mapAsync(4) {
      case message: TextMessage.Strict =>
        println(message.text)
        Future.successful(Done)
      case message: TextMessage.Streamed =>
        message.textStream.runForeach(println)
      case message: BinaryMessage =>
        message.dataStream.runWith(Sink.ignore)
    }.toMat(Sink.last)(Keep.right)

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))

val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .toMat(incoming)(Keep.both)
      // TODO not working integrating kafka here
      // .map(_.toString)
      //    .map { elem =>
      //      println(s"PlainSinkProducer produce: ${elem}")
      //      new ProducerRecord[Array[Byte], String]("topic1", elem)
      //    }
      //    .runWith(Producer.plainSink(producerSettings))
      .run()

val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      system.terminate
    }
  }

// kafka that works / writes dummy data
val done1 = Source(1 to 100)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))

person Georg Heiler    schedule 02.05.2017    source источник


Ответы (1)


Одна проблема связана со стадией incoming, которая моделируется как Sink. где он должен быть смоделирован как Flow. для последующей передачи сообщений в Kafka.

Поскольку входящие текстовые сообщения могут быть Streamed. вы можете использовать комбинатор flatMapMerge следующим образом, чтобы избежать необходимости хранить в памяти целые (потенциально большие) сообщения:

  val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
    case msg: BinaryMessage =>
      msg.dataStream.runWith(Sink.ignore)
      Future.successful(None)
    case TextMessage.Streamed(src) =>
      src.runFold("")(_ + _).map { msg => Some(msg) }
  }.collect {
    case Some(msg) => msg
  }

На данный момент у вас есть что-то, что создает строки и может быть подключено к Kafka:

  val addOrigin: Flow[String, String, NotUsed] = ???

  val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .via(incoming)
      .via(addOrigin)
      .map { elem =>
        println(s"PlainSinkProducer produce: ${elem}")
        new ProducerRecord[Array[Byte], String]("topic1", elem)
      }
      .toMat(Producer.plainSink(producerSettings))(Keep.both)
      .run()
person Stefano Bonetti    schedule 03.05.2017
comment
Это отлично работает в качестве стартера! Но как я могу 1) создать преобразователь, который добавляет "origin":"blockchain.info" поле и 2) создать из него этап графика? (здесь пример исполняемого кода github.com/geoHeil/akkaStreamsIngest) - person Georg Heiler; 04.05.2017
comment
1) просто добавьте поток после incoming, который добавляет в сообщение все, что вы хотите (см. измененный код) 2) из ​​чего вы хотите создать этап графика? - person Stefano Bonetti; 04.05.2017