Как связать akka http с потоками akka?

Я пытаюсь использовать потоки вместо чистых актеров для обработки HTTP-запросов, и у меня есть следующий код:

trait ImagesRoute {

  val log = LoggerFactory.getLogger(this.getClass)

  implicit def actorRefFactory: ActorRefFactory
  implicit def materializer: ActorMaterializer

  val source =
    Source
      .actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
      .via(Flow[Image].mapAsync(1)(ImageRepository.add))
      .toMat(Sink.asPublisher(true))(Keep.both)

  val route = {
    pathPrefix("images") {
      pathEnd {
        post {
          entity(as[Image]) { image =>

            val (ref, publisher) = source.run()

            val addFuture = Source.fromPublisher(publisher)

            val future = addFuture.runWith(Sink.head[Option[Image]])

            ref ! image

            onComplete(future.mapTo[Option[Image]]) {
              case Success(img) =>
                complete(Created, img)

              case Failure(e) =>
                log.error("Error adding image resource", e)
                complete(InternalServerError, e.getMessage)
            }
          }
        }
      }
    }
  }
}

Я не уверен, что это правильный способ сделать это, или даже если это хороший подход, или если я должен использовать актера для взаимодействия с маршрутом, используя шаблон запроса, а затем внутри актера транслировать все.

Любые идеи?


person Thiago Pereira    schedule 21.09.2016    source источник
comment
Если не ошибаюсь, в вашем случае потоки вообще не нужны. Насколько я понимаю, метод ImageRepository.add возвращает Future; все, что вам нужно сделать, это написать onComplete(ImageRepository.add(image)), и все.   -  person Vladimir Matveev    schedule 21.09.2016
comment
@VladimirMatveev да, верно, это всего лишь простой пример, но конвейер потока должен быть больше, делая много вещей, таких как связь с внешними ресурсами и, в конечном итоге, вещи обратного давления ...   -  person Thiago Pereira    schedule 21.09.2016


Ответы (1)


Если вы ожидаете только 1 изображение от объекта, вам не нужно создавать Source из ActorRef, и вам не нужно Sink.asPublisher, вы можете просто использовать Source.single:

def imageToComplete(img : Option[Image]) : StandardRoute = 
  img.map(i => complete(Created, i))
     .getOrElse {
       log error ("Error adding image resource", e)
       complete(InternalServerError, e.getMessage
     }

...

entity(as[Image]) { image =>

  val future : Future[StandardRoute] = 
    Source.single(image)
          .via(Flow[Image].mapAsync(1)(ImageRepository.add))
          .runWith(Sink.head[Option[Image]])
          .map(imageToComplete)

  onComplete(future)
}

Дальнейшее упрощение кода: тот факт, что вы обрабатываете только одно изображение, означает, что потоки не нужны, поскольку нет необходимости в обратном давлении с одним элементом:

val future : Future[StandardRoute] = ImageRepository.add(image)
                                                    .map(imageToComplete)

onComplete(future)

В комментариях вы указали

«это всего лишь простой пример, но конвейер потока должен быть больше, выполняя множество вещей, таких как связь с внешними ресурсами и, в конечном итоге, меры противодействия»

Это применимо только в том случае, если ваша сущность была потоком изображений. Если вы когда-либо обрабатываете только 1 изображение на HttpRequest, тогда обратное давление никогда не применяется, и любой поток, который вы создаете, будет более медленная версия Future.

Если ваша сущность на самом деле является потоком изображений, вы можете использовать ее как часть потока:

val byteStrToImage : Flow[ByteString, Image, _] = ???

val imageToByteStr : Flow[Image, Source[ByteString], _] = ???

def imageOptToSource(img : Option[Image]) : Source[Image,_] =
  Source fromIterator img.toIterator

val route = path("images") {
  post {
    extractRequestEntity { reqEntity =>

      val stream = reqEntity.via(byteStrToImage)
                            .via(Flow[Image].mapAsync(1)(ImageRepository.add))
                            .via(Flow.flatMapConcat(imageOptToSource))
                            .via(Flow.flatMapConcat(imageToByteStr))

      complete(HttpResponse(status=Created,entity = stream))
    }
  }
}    
person Ramón J Romero y Vigil    schedule 22.09.2016
comment
Спасибо, это имеет смысл - person Thiago Pereira; 22.09.2016