Как остановить Source.tick в Актере?

У меня есть актер, который каждые 2 секунды производит NotUsed. Может быть, в этом нет никакого смысла, но это только для целей тестирования.

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.sweetsoft.FsmSystem.{Add, StartTicker, StopTicker}

import scala.concurrent.duration._

object AddActor {

  def props: Props = Props(new AddActor)

}

final class AddActor extends Actor with ActorLogging {

  implicit val materilizer = ActorMaterializer()

  private val consumer: NotUsed => Unit = _ =>
    context.parent ! Add

  private val runnable = Source.tick(2.second, 2.second, NotUsed)
    .named("Ticker")
    .toMat(Sink.foreach(consumer))(Keep.both)

  override def receive: Receive = {
    case StartTicker =>

      runnable.run()
    case StopTicker =>

  }
}  

Когда Актер получает сообщение StopTicker, я хотел бы остановить поток. И по StartTicker, затем запустите стрим.

Вызывая метод run(), я бы получил материализованное значение Cancellabel, но к нему невозможно получить доступ в пределах StopTicker.

Что я должен делать?


person softshipper    schedule 11.05.2019    source источник
comment
сохраните это в переменной   -  person Dima    schedule 11.05.2019
comment
Но это невозможно с val только var, верно?   -  person softshipper    schedule 11.05.2019
comment
да, использование var распространено в Actors (это не совсем функционально). Просто убедитесь, что вы никогда не обращаетесь к нему за пределами обработки сообщений. Вроде не закрывать, а ставить на Future.   -  person Dima    schedule 11.05.2019
comment
Действительно, или лучше построить Actor с FSM?   -  person softshipper    schedule 11.05.2019
comment
Я не понимаю, как это связано. FSM имеет состояние, поэтому он по своей сути изменчив. Все, что делает DSL, - это позволяет скрыть изменения в самом указателе состояния за API. Я не думаю, что на практике это имеет значение.   -  person Dima    schedule 12.05.2019


Ответы (1)


Вы можете использовать шаблон becode / unbecome:

final class AddActor extends Actor with ActorLogging {
    import context._

    implicit val materilizer: ActorMaterializer = ActorMaterializer()

    private val consumer: NotUsed => Unit = _ =>
      self ! Add //send Add to self rather than directly to context.parent

    private val runnable = Source.tick(2.second, 2.second, NotUsed)
      .named("Ticker")
      .toMat(Sink.foreach(consumer))(Keep.both)

    override def preStart(): Unit = { //start runnable on actor start
      super.preStart()
      runnable.run()
    }

    val running: Receive = { //when running ignore new StartTicker but handle StopTicker and Add
      case StopTicker =>
        become(paused)

      case Add =>
        context.parent ! Add
    }

    val paused: Receive = { //handle just StartTicker
      case StartTicker =>
        become(running)
    }

    override def receive: Receive = paused //initial receive is paused

}
person Krzysztof Atłasik    schedule 11.05.2019