Как повторить неудачную попытку Unmarshalling потока запросов akka-http?

Я знаю, что можно перезапустить поток akka при ошибке с помощью стратегии наблюдения на ActorMaterialzer.

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _                      => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

источник: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html

У меня есть следующий вариант использования.

/***
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-experimental"            % "2.4.2",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/

import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future

object SO extends DefaultJsonProtocol {

  implicit val system = ActorSystem()
  import system.dispatcher
  implicit val materializer = ActorMaterializer()

  val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")

  def search(query: Char) = {
    val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
    (request, request)
  }

  case class Hello(name: String)
  implicit val helloFormat = jsonFormat1(Hello)

  val searches =
    Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
      case (Success(response), _) => Unmarshal(response).to[Hello]
      case (Failure(e), _) => Future.failed(e)
    }

  def main(): Unit = {
    Await.result(searches.runForeach(_ => println), Duration.Inf)
    ()
  }
}

Иногда запрос не может быть рассортирован. Я хочу использовать стратегию повтора для этого единственного запроса https://example.org/?q=v без перезапуска всего алфавита.


person Guillaume Massé    schedule 31.03.2016    source источник
comment
Вы хотите использовать тот же запрос со следующей буквой или хотите повторить тот же запрос с той же буквой, или вы хотите использовать другой запрос? Если вы хотите попробовать только следующий запрос, вы можете добавить стратегию наблюдения к mapAsync, например .withAttributes(supervisionStrategy(resumingDecider)) (но это в документе, который вы связали, поэтому я предполагаю, что вам нужно что-то еще)   -  person lpiepiora    schedule 31.03.2016
comment
да повторите один запрос, та же буква. Если v не удается с 500, я хочу повторить попытку v еще два раза.   -  person Guillaume Massé    schedule 31.03.2016
comment
Вам нужно запустить его как часть вашего потока, потому что вы можете запустить его как отдельный поток, а затем повторить попытку, используя стандартные будущие механизмы, что было бы как-то проще, я думаю. В противном случае, я думаю, вам нужно создать график, который будет возвращать неудачные запросы. Я могу показать вам любое или оба решения, если вы заинтересованы.   -  person lpiepiora    schedule 01.04.2016
comment
Меня интересует графическое решение   -  person Guillaume Massé    schedule 01.04.2016


Ответы (2)


Я думаю, что будет сложно (или невозможно) реализовать это с помощью супервизорной стратегии, в основном потому, что вы хотите повторить «n» раз (согласно обсуждению в комментариях), и я не думаю, что вы можете отследить количество раз элемент был опробован при использовании наблюдения.

Я думаю, что есть два пути решения этой проблемы. Либо обработайте рискованную операцию как отдельный поток, либо создайте граф, который будет обрабатывать ошибки. Я предложу два решения.

Также обратите внимание, что Akka Streams отличает ошибки от сбоев, поэтому, если вы не будете обрабатывать свои сбои, они в конечном итоге свернут поток (если не введена никакая стратегия), поэтому в приведенном ниже примере я конвертирую их в Either, что представляют либо успех, либо ошибку.

Отдельный поток

Что вы можете сделать, так это рассматривать каждую букву алфавита как отдельный поток и обрабатывать сбои для каждой буквы отдельно с помощью стратегии повторных попыток и некоторой задержки.

// this comes after your helloFormat

// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
  f.recoverWith {
    // you may want to only handle certain exceptions here...
    case ex: Exception if c > 0 =>
      println(s"failed - will retry ${c - 1} more times")
      akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
  }

val singleElementFlow = httpFlow.mapAsync[Hello](1) {
  case (Success(response), _) =>
    val f = Unmarshal(response).to[Hello]
    f.recoverWith {
      case ex: Exception =>
        // see https://github.com/akka/akka/issues/20192
        response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
    }
  case (Failure(e), _) => Future.failed(e)
}

// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
  Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
    println(s"trying $elem")
    retry(
      Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
      1.seconds, 3
    ).map(ok => Right(ok)).recover { case ex => Left(ex) }
  }
// end

График

Этот метод будет интегрировать сбои в график и разрешать повторные попытки. В этом примере все запросы выполняются параллельно и вы предпочитаете повторять те, которые не удалось выполнить, но если вы не хотите такого поведения и запускаете их один за другим, это то, что вы также можете сделать, я думаю.

// this comes after your helloFormat

// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]

def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
  val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
  (request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}

val g = GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val searches = b.add(Flow[Char])

  val tryParse =
    Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
      case (Success(response), (req, tries)) =>
        println(s"trying parse response to $req for $tries")
        Unmarshal(response).to[Hello].
          map(h => Right(h)).
          recoverWith {
            case ex: Exception =>
              // see https://github.com/akka/akka/issues/20192
              response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
                Left((req, tries + 1))
              }
          }
      case (Failure(e), _) => Future.failed(e)
    }

  val broadcast = b.add(Broadcast[ParseResult](2))

  val nonErrors = b.add(Flow[ParseResult].collect {
    case Right(x) => x
    // you may also handle here Lefts which do exceeded retries count
  })

  val errors = Flow[ParseResult].collect {
    case Left(x) if x._2 < 3 => (x._1, x)
  }
  val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))

  // @formatter:off
  searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
                          merge.preferred <~ errors <~ broadcast
  // @formatter:on

  FlowShape(searches.in, nonErrors.out)
}

def main(args: Array[String]): Unit = {
  val source = Source('a' to 'z')
  val sink = Sink.seq[Hello]

  source.via(g).toMat(sink)(Keep.right).run().onComplete {
    case Success(seq) =>
      println(seq)
    case Failure(ex) =>
      println(ex)
  }

}

По сути, здесь происходит то, что мы запускаем поиск по httpFlow, а затем пытаемся проанализировать ответ, затем мы транслируем результат и разделяем ошибки и не ошибки, не ошибки отправляются в сток, а ошибки отправляются обратно в цикл. Если количество повторных попыток превышает count, мы игнорируем элемент, но вы также можете сделать с ним что-то еще.

Во всяком случае, я надеюсь, что это дает вам некоторое представление.

person lpiepiora    schedule 01.04.2016
comment
Большое усилие. Я открою награду за ваш ответ. Я думаю, что Graph должен быть FlowShape, потому что я все еще хочу обработать Hello после. Также вы уверены, что это слияние правильно? stackoverflow.com/a/33962702/449071 - person Guillaume Massé; 01.04.2016
comment
Если вы хотите добавить ввод извне и обработать его после, тогда да, если вы просто хотите обработать Hallo после, но он исходит из Graph, это может быть SourceShape. Тогда вы не оборачиваете его в RunnableGraph, и вы можете запустить его как Source.fromGraph(g).runWith(Sink.seq[Hello]) (например). Да, если вам нужно, чтобы ваш поток был завершен, вам нужно сделать слияние активным, как в ссылке, которую вы отправили. Я обновлю свой ответ. - person lpiepiora; 01.04.2016

Для приведенного выше решения с потоками любые повторные попытки для последнего элемента в потоке выполняться не будут. Это потому, что когда восходящий поток завершается после отправки последнего элемента, слияние также завершается. После этого единственный вывод пришел из выхода без повторной попытки, но поскольку элемент переходит к повторной попытке, он также завершается.

Если вам нужны все входные элементы для генерации выходных данных, вам понадобится дополнительный механизм, чтобы предотвратить завершение восходящего потока от достижения графа процесса и повтора. Одна из возможностей — использовать BidiFlow, который отслеживает входные и выходные данные из графа процессов и повторов, чтобы гарантировать, что все необходимые выходные данные были сгенерированы (для наблюдаемых входных данных) перед распространением oncomplete. В простом случае это может быть просто подсчет входных и выходных элементов.

person leachbj    schedule 07.05.2016