Как дождаться бесплатного актера Akka при обработке потока данных с помощью Plays Iteratee

У меня есть бесконечный поток с сообщениями, представленными как Plays Enumerator, к которым я применяю Iteratee. Затем каждое сообщение обрабатывается актором Akka (количество актеров ограничено 10).

Теперь я хотел бы, чтобы код в Iteratee асинхронно ждал свободного актера, если все 10 актеров заняты, и не отправлял им другие сообщения, что приводит к исключению Ask timed out on ....

Как добиться такой функциональности? Есть ли лучший способ обработки бесконечного потока с 10 актерами без await?

Пример кода, о котором я говорил, может выглядеть так:

val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(10)))
val it = Iteratee.foreach[Msg] { msg => 
  workers ? msg
}

msgEnumerator.apply(it)

person kurochenko    schedule 21.01.2016    source источник


Ответы (1)


Использование Iteratee.foldM с шаблоном запроса актера, который у вас есть, кажется правильным подходом. Предполагая, что вы не хотите, чтобы ваши акторы создавали большие почтовые ящики (если вам не нужны большие почтовые ящики, просто используйте tell и Iteratee.foreach вместо ask). Это потребует некоторой специальной логики маршрутизации. Поскольку API для создания пользовательского маршрутизатора akka не поддерживает асинхронность, вам понадобится собственный актор для обработки логики распределения только одной части работы для каждого актера в вашем пуле актеров за раз.

Я представляю, как это работает примерно так:

class WorkDistributor extends Actor {
  final val NUM_WORKERS = 10
  val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(NUM_WORKERS))) 

  var numActiveWorkers = 0
  var queuedWork: Option[Work] = None

  def receive = {
    case IterateeWork(work) if numActiveWorkers < NUM_WORKERS => workers ! work; numActiveWorkers += 1; sender ! SendMeMoreWork
    case IterateeWork(work) => queuedWork = Some(work)
    case ActorFinishedWork if queuedWork.isDefined => queuedWork.foreach(workers ! _); queuedWork = None
    case ActorFinishedWork => numActiveWorkers -= 1; sender ! SendMeMoreWork
  }
}

Где сообщение IterateeWork отправляется итерируемым, а сообщение ActorFinishedWork отправляется субъектами в пуле субъектов.

Глядя на эту вещь, которую я написал, ее следует переписать, чтобы использовать become для изменения поведения, когда пул актеров заполнен (вместо фильтров if в каждом случае, но я оставляю это как упражнение для читателя.

Тогда ваш Iteratee будет выглядеть так

Iteratee.foldM[Work, SendMeMoreWork.type](SendMeMoreWork) {
  case (_, work) => workDistributor ? IterateeWork(work)
}
person Asa    schedule 21.01.2016