Стратегия наблюдения за потоком Akka Kafka не работает

Я запускаю приложение Akka Streams Kafka, и я хочу включить стратегию наблюдения в потребителя потока, чтобы, если брокер выходит из строя, а потребитель потока умирает после тайм-аута остановки, супервизор может перезапустить потребителя.

Вот мой полный код:

UserEventStream:

import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer

class UserEventStream extends Actor {

  val settings = Settings(context.system).KafkaConsumers
  implicit val timeout: Timeout = Timeout(10 seconds)
  implicit val materializer = ActorMaterializer()

  override def preStart(): Unit = {
    super.preStart()
    println("Starting UserEventStream....s")
  }
  override def receive = {
    case "start" =>
      val consumerConfig = settings.KafkaConsumerInfo
      println(s"ConsumerConfig with $consumerConfig")
      startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
  }

  def startStreamConsumer(config: Map[String, String]) = {
    println(s"startStreamConsumer with config $config")

    val consumerSource = createConsumerSource(config)
    val consumerSink = createConsumerSink()
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")

    println("START: The UserEventStream processing")
    val future =
      consumerSource
        .mapAsync(parallelism = 50) { message =>
          val m = s"${message.record.value()}"
          messageProcessor ? m
        }
        .runWith(consumerSink)
    future.onComplete {
      case Failure(ex) =>
        println("FAILURE : The UserEventStream processing, stopping the actor.")
        self ! PoisonPill
      case Success(ex) =>
    }
  }

  def createConsumerSource(config: Map[String, String]) = {
    val kafkaMBAddress = config("bootstrap-servers")
    val groupID = config("groupId")
    val topicSubscription = config("subscription-topic").split(',').toList
    println(s"Subscriptiontopics $topicSubscription")

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
  }

  def createConsumerSink() = {
    Sink.foreach(println)
  }
}  

StreamProcessorSupervisor (это класс супервизора класса UserEventStream):

import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._

object StreamProcessorSupervisor {
  final case object StartSimulator
  final case class StartClient(id: String)
  def props(implicit materializer: ActorMaterializer) =
    Props(classOf[StreamProcessorSupervisor], materializer)
}

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props(classOf[UserEventStream])
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          childProps,
          childName = "usereventstream",
          minBackoff = 1.second,
          maxBackoff = 1.minutes,
          randomFactor = 0.2
        )
      )
      context.actorOf(supervisor, name = s"$id-backoff-supervisor")
      val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
      userEventStrean ! "start"
  }
}

App (основной класс приложения):

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer

object App extends App {

  implicit val system = ActorSystem("stream-test")
  implicit val materializer = ActorMaterializer()

  system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}

application.conf:

kafka {

  consumer {

    num-consumers = "1"
    c1 {
      bootstrap-servers = "localhost:9092"
      bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
      groupId = "localakkagroup1"
      subscription-topic = "test"
      subscription-topic = ${?SUBSCRIPTION_TOPIC1}
      message-type = "UserEventMessage"
      poll-interval = 50ms
      poll-timeout = 50ms
      stop-timeout = 30s
      close-timeout = 20s
      commit-timeout = 15s
      wakeup-timeout = 10s
      max-wakeups = 10
      use-dispatcher = "akka.kafka.default-dispatcher"
      kafka-clients {
        enable.auto.commit = true
      }
    }
  }
}

После запуска приложения я намеренно убил брокера Kafka, а затем обнаружил, что через 30 секунд актер останавливает себя, посылая отравленную таблетку. Но, как ни странно, он не перезапускается, как указано в стратегии BackoffSupervisor.

В чем может быть проблема?


person dks551    schedule 06.10.2017    source источник


Ответы (1)


В вашем коде есть два экземпляра UserEventStream: один - это дочерний субъект, который BackoffSupervisor внутренне создает с Props, который вы ему передаете, а другой - это val userEventStrean, который является дочерним элементом StreamProcessorSupervisor. Вы отправляете сообщение "start" второму, тогда как вы должны отправлять это сообщение первому.

Вам не нужен val userEventStrean, потому что BackoffSupervisor создает дочернего актера. Сообщения, отправленные на BackoffSupervisor, пересылаются дочернему элементу, поэтому, чтобы отправить "start" сообщение дочернему элементу, отправьте его BackoffSupervisor:

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props[UserEventStream]
      val supervisorProps = BackoffSupervisor.props(...)
      val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
      supervisor ! "start"
  }
}

Другая проблема заключается в том, что когда актер получает PoisonPill, это не то же самое, что актер, генерирующий исключение. Следовательно, Backoff.onFailure не сработает, когда UserEventStream отправит себе PoisonPill. PoisonPill останавливает актера, поэтому используйте вместо него Backoff.onStop:

val supervisorProps = BackoffSupervisor.props(
  Backoff.onStop( // <--- use onStop
    childProps,
    ...
  )
)
val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
supervisor ! "start"
person Jeffrey Chung    schedule 06.10.2017
comment
даже после внесения вышеуказанных изменений супервизор не отправляет стартовое сообщение дочернему элементу после того, как ребенок остановлен. Хотя ребенок снова создан. Как руководитель может послать сообщение ребенку - person dks551; 06.10.2017