Повторите попытку с небольшими исключениями для долгоживущего актера akka

У меня есть актер, который создается при запуске приложения как дочерний элемент другого актера и получает сообщение один раз в день от родителя для выполнения операции по извлечению некоторых файлов с какого-либо SFTP-сервера.

Теперь могут быть некоторые незначительные временные исключения подключения, которые приводят к сбою операции. В этом случае требуется повторная попытка.

Но может быть случай, когда возникает исключение, которое не будет разрешено при повторной попытке (например, файл не найден, какая-то конфигурация неверна и т. д.).

Итак, что в этом случае может быть подходящим механизмом повторных попыток и стратегией контроля, учитывая, что актор будет получать сообщения через длительный интервал (один раз в день).

В этом случае сообщение, отправленное актору, не является плохим вводом — это просто триггер. Пример:

case object FileFetch

Если у меня есть такая стратегия наблюдения в родительском объекте, она будет перезапускать неисправный дочерний элемент при каждом незначительном/основном исключении без повторных попыток.

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: Exception                => Restart
}

Я хочу иметь что-то вроде этого:

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: MinorException           => Retry same message 2, 3 times and then Restart
    case _: Exception                => Restart
}

person ak0817    schedule 25.01.2018    source источник


Ответы (1)


«Повторная попытка» или повторная отправка сообщения в случае исключения — это то, что вы должны реализовать самостоятельно. Из документации< /а>:

Если во время обработки сообщения (т. е. извлечения из почтового ящика и передачи его текущему поведению) возникает исключение, то это сообщение будет потеряно. Важно понимать, что обратно в почтовый ящик его не кладут. Поэтому, если вы хотите повторить обработку сообщения, вам нужно справиться с этим самостоятельно, перехватив исключение и повторив попытку. Убедитесь, что вы установили ограничение на количество повторных попыток, так как вы не хотите, чтобы система была заблокирована (чтобы потреблять много циклов процессора без прогресса).

Если вы хотите повторно отправить сообщение FileFetch дочернему элементу в случае MinorException без перезапуска дочернего элемента, вы можете перехватить исключение в дочернем элементе, чтобы избежать запуска стратегии контроля. В блоке try-catch вы можете отправить сообщение родителю и попросить родителя отслеживать количество повторных попыток (и, возможно, включить в это сообщение отметку времени, если вы хотите, например, чтобы родитель принял какую-то политику отсрочки) . У ребенка:

def receive = {
  case FileFetch =>
    try {
      ...
    } catch {
      case m: MinorException =>
        val now = System.nanoTime
        context.parent ! MinorIncident(self, now)
    }
  case ...
} 

В родительском:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
    case _: Exception => Restart
  }

var numFetchRetries = 0

def receive = {
  case MinorIncident(fetcherRef, time) =>
    log.error(s"${fetcherRef} threw a MinorException at ${time}")
    if (numFetchRetries < 3) { // possibly use the time in the retry logic; e.g., a backoff
      numFetchRetries = numFetchRetries + 1
      fetcherRef ! FileFetch
    } else {
      numFetchRetries = 0
      context.stop(fetcherRef)
      ... // recreate the child
    }
  case SomeMsgFromChildThatFetchSucceeded =>
    numFetchRetries = 0
  case ...
}

В качестве альтернативы, вместо того, чтобы перехватывать исключение в дочернем элементе, вы можете установить стратегию супервизора на Resume дочерний элемент в случае MinorException, в то время как родитель по-прежнему будет обрабатывать логику повторной отправки сообщения:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
    case m: MinorException =>
      val child = sender()
      val now = System.nanoTime
      self ! MinorIncident(child, now)
      Resume
    case _: Exception => Restart
  }
person Jeffrey Chung    schedule 25.01.2018