У меня есть актер, который каждые 2 секунды производит NotUsed
. Может быть, в этом нет никакого смысла, но это только для целей тестирования.
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.sweetsoft.FsmSystem.{Add, StartTicker, StopTicker}
import scala.concurrent.duration._
object AddActor {
def props: Props = Props(new AddActor)
}
final class AddActor extends Actor with ActorLogging {
implicit val materilizer = ActorMaterializer()
private val consumer: NotUsed => Unit = _ =>
context.parent ! Add
private val runnable = Source.tick(2.second, 2.second, NotUsed)
.named("Ticker")
.toMat(Sink.foreach(consumer))(Keep.both)
override def receive: Receive = {
case StartTicker =>
runnable.run()
case StopTicker =>
}
}
Когда Актер получает сообщение StopTicker
, я хотел бы остановить поток. И по StartTicker
, затем запустите стрим.
Вызывая метод run()
, я бы получил материализованное значение Cancellabel
, но к нему невозможно получить доступ в пределах StopTicker
.
Что я должен делать?
val
толькоvar
, верно? - person softshipper   schedule 11.05.2019var
распространено вActors
(это не совсем функционально). Просто убедитесь, что вы никогда не обращаетесь к нему за пределами обработки сообщений. Вроде не закрывать, а ставить наFuture
. - person Dima   schedule 11.05.2019Actor
сFSM
? - person softshipper   schedule 11.05.2019