Потребительский зачет alpakka kafka

Я использую Alpakka-kafka в scala для использования темы Kafka. Вот мой код:

    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

Однако потребитель начинает опрос только с первого незафиксированного сообщения в теме. Я хотел бы всегда начинать со смещения 0, независимо от того, какие сообщения фиксируются. Как с потребителем Alpakka указать смещение вручную?


person Vasily802    schedule 03.04.2020    source источник


Ответы (1)


Я думаю, вы хотите добавить пару записей конфигурации:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False, чтобы в вашей работе никогда не сохранялось смещение

  2. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" так что ваша работа начинается с самого начала.

Если ваша работа уже фиксировала смещения в прошлом, вам, возможно, придется сбросить смещение на самое раннее.

person 0x26res    schedule 17.04.2020