Я думаю, что будет сложно (или невозможно) реализовать это с помощью супервизорной стратегии, в основном потому, что вы хотите повторить «n» раз (согласно обсуждению в комментариях), и я не думаю, что вы можете отследить количество раз элемент был опробован при использовании наблюдения.
Я думаю, что есть два пути решения этой проблемы. Либо обработайте рискованную операцию как отдельный поток, либо создайте граф, который будет обрабатывать ошибки. Я предложу два решения.
Также обратите внимание, что Akka Streams отличает ошибки от сбоев, поэтому, если вы не будете обрабатывать свои сбои, они в конечном итоге свернут поток (если не введена никакая стратегия), поэтому в приведенном ниже примере я конвертирую их в Either
, что представляют либо успех, либо ошибку.
Отдельный поток
Что вы можете сделать, так это рассматривать каждую букву алфавита как отдельный поток и обрабатывать сбои для каждой буквы отдельно с помощью стратегии повторных попыток и некоторой задержки.
// this comes after your helloFormat
// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
f.recoverWith {
// you may want to only handle certain exceptions here...
case ex: Exception if c > 0 =>
println(s"failed - will retry ${c - 1} more times")
akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
}
val singleElementFlow = httpFlow.mapAsync[Hello](1) {
case (Success(response), _) =>
val f = Unmarshal(response).to[Hello]
f.recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
}
case (Failure(e), _) => Future.failed(e)
}
// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
println(s"trying $elem")
retry(
Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
1.seconds, 3
).map(ok => Right(ok)).recover { case ex => Left(ex) }
}
// end
График
Этот метод будет интегрировать сбои в график и разрешать повторные попытки. В этом примере все запросы выполняются параллельно и вы предпочитаете повторять те, которые не удалось выполнить, но если вы не хотите такого поведения и запускаете их один за другим, это то, что вы также можете сделать, я думаю.
// this comes after your helloFormat
// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]
def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val searches = b.add(Flow[Char])
val tryParse =
Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
case (Success(response), (req, tries)) =>
println(s"trying parse response to $req for $tries")
Unmarshal(response).to[Hello].
map(h => Right(h)).
recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
Left((req, tries + 1))
}
}
case (Failure(e), _) => Future.failed(e)
}
val broadcast = b.add(Broadcast[ParseResult](2))
val nonErrors = b.add(Flow[ParseResult].collect {
case Right(x) => x
// you may also handle here Lefts which do exceeded retries count
})
val errors = Flow[ParseResult].collect {
case Left(x) if x._2 < 3 => (x._1, x)
}
val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))
// @formatter:off
searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
merge.preferred <~ errors <~ broadcast
// @formatter:on
FlowShape(searches.in, nonErrors.out)
}
def main(args: Array[String]): Unit = {
val source = Source('a' to 'z')
val sink = Sink.seq[Hello]
source.via(g).toMat(sink)(Keep.right).run().onComplete {
case Success(seq) =>
println(seq)
case Failure(ex) =>
println(ex)
}
}
По сути, здесь происходит то, что мы запускаем поиск по httpFlow
, а затем пытаемся проанализировать ответ, затем мы транслируем результат и разделяем ошибки и не ошибки, не ошибки отправляются в сток, а ошибки отправляются обратно в цикл. Если количество повторных попыток превышает count, мы игнорируем элемент, но вы также можете сделать с ним что-то еще.
Во всяком случае, я надеюсь, что это дает вам некоторое представление.
person
lpiepiora
schedule
01.04.2016
mapAsync
, например.withAttributes(supervisionStrategy(resumingDecider))
(но это в документе, который вы связали, поэтому я предполагаю, что вам нужно что-то еще) - person lpiepiora   schedule 31.03.2016v
не удается с 500, я хочу повторить попыткуv
еще два раза. - person Guillaume Massé   schedule 31.03.2016