Кафка-потребитель. commitSync против commitAsync

Цитата из

Не могли бы вы подробно пояснить разницу между commitSync и commitAsync?
Также, пожалуйста, предоставьте варианты использования, когда какой тип фиксации мне следует предпочесть.


person gstackoverflow    schedule 03.10.2017    source источник


Ответы (4)


Как сказано в документации API:


Это синхронная фиксация, которая будет блокироваться до тех пор, пока фиксация не будет успешной или не возникнет неисправимая ошибка (в этом случае она будет передана вызывающей стороне).

Это означает, что commitSync - это метод блокировки. Его вызов заблокирует ваш поток до тех пор, пока он не завершится успешно или не завершится ошибкой.

Например,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

Для каждой итерации в цикле for только после успешного возврата или прерывания consumer.commitSync() с созданием исключения ваш код перейдет к следующей итерации.


Это асинхронный вызов, который не блокируется. Любые обнаруженные ошибки либо передаются в функцию обратного вызова (если таковая имеется), либо отбрасываются.

Это означает, что commitAsync - это неблокирующий метод. Вызов этого не заблокирует ваш поток. Вместо этого он продолжит обработку следующих инструкций, независимо от того, удастся ли он в конечном итоге или нет.

Например, аналогично предыдущему примеру, но здесь мы используем commitAsync:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

Для каждой итерации цикла for, независимо от того, что в конечном итоге произойдет с consumer.commitAsync(), ваш код перейдет к следующей итерации. И результат фиксации будет обрабатываться функцией обратного вызова, которую вы определили.


Компромиссы: задержка и согласованность данных

  • Если вам нужно обеспечить согласованность данных, выберите commitSync(), потому что это гарантирует, что перед выполнением каких-либо дальнейших действий вы будете знать, успешна ли фиксация смещения или нет. Но поскольку это синхронизация и блокировка, вы потратите больше времени на ожидание завершения фиксации, что приведет к высокой задержке.
  • Если вас устраивает определенная несогласованность данных и вы хотите иметь низкую задержку, выберите commitAsync(), потому что это не будет ждать завершения. Вместо этого он просто отправит запрос на фиксацию и позже обработает ответ от Kafka (успех или неудача), а между тем ваш код продолжит выполнение.

Это все, вообще говоря, фактическое поведение будет зависеть от вашего фактического кода и того, где вы вызываете метод.

person fluency03    schedule 15.01.2018
comment
Consumer.poll устарел, как бы вы преобразовали данные в ConsumerRecords? - person Tiago Medici; 09.01.2020
comment
Также стоит упомянуть, что callback из commitAsync(callback) будет вызываться в том же потоке, что и consumer.poll(). - person Kewei Shang; 21.06.2020

Надежная обработка повторов с помощью commitAsync ()

В книге Kafka - The Definitive Guide есть подсказка о том, как смягчить потенциальную проблему фиксации более низких смещений из-за асинхронной фиксации:

Повторная попытка асинхронных попыток. Простым шаблоном для получения правильного порядка фиксации для асинхронных повторных попыток является использование монотонно увеличивающегося порядкового номера. Увеличивайте порядковый номер каждый раз при фиксации и добавляйте порядковый номер во время фиксации в обратный вызов commitAsync. Когда вы будете готовы отправить повторную попытку, проверьте, равен ли порядковый номер фиксации, полученный обратным вызовом, переменной экземпляра; если это так, значит, более новой фиксации не было, и можно безопасно повторить попытку. Если порядковый номер экземпляра выше, не повторяйте попытку, потому что уже была отправлена ​​более новая фиксация.

В следующем коде показано возможное решение:

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}
person mike    schedule 10.10.2020

И в commitSync, и в commitAsync используется функция управления смещением kafka, и у обоих есть недостатки. Если обработка сообщения завершается успешно и смещение фиксации не выполнено (не атомарно), и в то же время происходит повторная балансировка раздела, ваше обработанное сообщение снова обрабатывается (дублированная обработка) другим потребителем. Если вас устраивает обработка дублированных сообщений, вы можете использовать commitAsync (потому что он не блокирует и не обеспечивает низкую задержку, а также обеспечивает фиксацию более высокого порядка. Так что все должно быть в порядке). В противном случае перейдите к настраиваемому управлению смещением, которое заботится об атомарности при обработке и обновлении смещения (используйте внешнее хранилище смещения)

person Charls Joseph    schedule 04.02.2019

commitAync не будет повторять попытку, потому что, если она повторяет попытку, это приведет к беспорядку.

Представьте, что вы пытаетесь зафиксировать смещение 20 (асинхронно), но оно не было зафиксировано (неудачно), а затем следующий блок опроса пытается зафиксировать смещение 40 (асинхронно), и это удалось.

Теперь смещение коммита 20 все еще ожидает фиксации, если оно вернется и завершится успешно, это приведет к беспорядку.

Беспорядок в том, что зафиксированное смещение должно быть 40, а не 20.

person Moha    schedule 31.05.2020
comment
см. мой ответ, чтобы обойти эту потенциальную проблему. - person mike; 10.10.2020