Кафка-потребитель. commitSync против commitAsync
Не могли бы вы подробно пояснить разницу между commitSync
и commitAsync
?
Также, пожалуйста, предоставьте варианты использования, когда какой тип фиксации мне следует предпочесть.
Ответы (4)
Как сказано в документации API:
Это синхронная фиксация, которая будет блокироваться до тех пор, пока фиксация не будет успешной или не возникнет неисправимая ошибка (в этом случае она будет передана вызывающей стороне).
Это означает, что commitSync
- это метод блокировки. Его вызов заблокирует ваш поток до тех пор, пока он не завершится успешно или не завершится ошибкой.
Например,
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
Для каждой итерации в цикле for только после успешного возврата или прерывания consumer.commitSync()
с созданием исключения ваш код перейдет к следующей итерации.
Это асинхронный вызов, который не блокируется. Любые обнаруженные ошибки либо передаются в функцию обратного вызова (если таковая имеется), либо отбрасываются.
Это означает, что commitAsync
- это неблокирующий метод. Вызов этого не заблокирует ваш поток. Вместо этого он продолжит обработку следующих инструкций, независимо от того, удастся ли он в конечном итоге или нет.
Например, аналогично предыдущему примеру, но здесь мы используем commitAsync
:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
Для каждой итерации цикла for, независимо от того, что в конечном итоге произойдет с consumer.commitAsync()
, ваш код перейдет к следующей итерации. И результат фиксации будет обрабатываться функцией обратного вызова, которую вы определили.
Компромиссы: задержка и согласованность данных
- Если вам нужно обеспечить согласованность данных, выберите
commitSync()
, потому что это гарантирует, что перед выполнением каких-либо дальнейших действий вы будете знать, успешна ли фиксация смещения или нет. Но поскольку это синхронизация и блокировка, вы потратите больше времени на ожидание завершения фиксации, что приведет к высокой задержке. - Если вас устраивает определенная несогласованность данных и вы хотите иметь низкую задержку, выберите
commitAsync()
, потому что это не будет ждать завершения. Вместо этого он просто отправит запрос на фиксацию и позже обработает ответ от Kafka (успех или неудача), а между тем ваш код продолжит выполнение.
Это все, вообще говоря, фактическое поведение будет зависеть от вашего фактического кода и того, где вы вызываете метод.
callback
из commitAsync(callback)
будет вызываться в том же потоке, что и consumer.poll()
.
- person Kewei Shang; 21.06.2020
Надежная обработка повторов с помощью commitAsync ()
В книге Kafka - The Definitive Guide есть подсказка о том, как смягчить потенциальную проблему фиксации более низких смещений из-за асинхронной фиксации:
Повторная попытка асинхронных попыток. Простым шаблоном для получения правильного порядка фиксации для асинхронных повторных попыток является использование монотонно увеличивающегося порядкового номера. Увеличивайте порядковый номер каждый раз при фиксации и добавляйте порядковый номер во время фиксации в обратный вызов commitAsync. Когда вы будете готовы отправить повторную попытку, проверьте, равен ли порядковый номер фиксации, полученный обратным вызовом, переменной экземпляра; если это так, значит, более новой фиксации не было, и можно безопасно повторить попытку. Если порядковый номер экземпляра выше, не повторяйте попытку, потому что уже была отправлена более новая фиксация.
В следующем коде показано возможное решение:
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
И в commitSync, и в commitAsync используется функция управления смещением kafka, и у обоих есть недостатки. Если обработка сообщения завершается успешно и смещение фиксации не выполнено (не атомарно), и в то же время происходит повторная балансировка раздела, ваше обработанное сообщение снова обрабатывается (дублированная обработка) другим потребителем. Если вас устраивает обработка дублированных сообщений, вы можете использовать commitAsync (потому что он не блокирует и не обеспечивает низкую задержку, а также обеспечивает фиксацию более высокого порядка. Так что все должно быть в порядке). В противном случае перейдите к настраиваемому управлению смещением, которое заботится об атомарности при обработке и обновлении смещения (используйте внешнее хранилище смещения)
commitAync
не будет повторять попытку, потому что, если она повторяет попытку, это приведет к беспорядку.
Представьте, что вы пытаетесь зафиксировать смещение 20 (асинхронно), но оно не было зафиксировано (неудачно), а затем следующий блок опроса пытается зафиксировать смещение 40 (асинхронно), и это удалось.
Теперь смещение коммита 20 все еще ожидает фиксации, если оно вернется и завершится успешно, это приведет к беспорядку.
Беспорядок в том, что зафиксированное смещение должно быть 40, а не 20.