На основании вашего уточнения вопроса.
Потребитель Kafka может читать несколько сообщений одновременно. Но Kafka Consumer на самом деле не читает сообщения, правильнее сказать, что Consumer читает определенное количество байтов, а затем, исходя из размера отдельных сообщений, определяет, сколько сообщений будет прочитано. При чтении конфигураций потребителя Kafka вы не можете указать, сколько сообщений нужно получать. , вы указываете максимальный / минимальный размер данных, которые может получить потребитель. Сколько бы сообщений ни поместилось в этот диапазон, столько и вы получите. Вы всегда будете получать сообщения последовательно, как вы указали.
Связанные потребительские конфигурации (для версии 0.9.0.0 и выше)
- fetch.min.bytes
- max.partition.fetch.bytes
ОБНОВЛЕНИЕ
Используя ваш пример в комментариях, «я понимаю, что если я указываю в config читать 10 байтов, и если каждое сообщение составляет 2 байта, потребитель читает 5 сообщений за раз». Это правда. Ваше следующее утверждение, «это означает, что смещения этих 5 сообщений были случайными с in partition», неверно. Последовательное чтение не означает одно за другим, это просто означает, что они остаются упорядоченными. Вы можете группировать товары и сохранять их последовательными / упорядоченными. Возьмем следующие примеры.
В журнале Kafka, если есть 10 сообщений (каждые 2 байта) со следующими смещениями, [0,1,2,3,4,5,6,7,8,9].
Если вы прочитаете 10 байт, вы получите пакет, содержащий сообщения со смещениями [0,1,2,3,4].
Если вы прочитаете 6 байтов, вы получите пакет, содержащий сообщения со смещениями [0,1,2].
Если вы прочитаете 6 байтов, а затем еще 6 байтов, вы получите два пакета, содержащие сообщения [0,1,2] и [3,4,5].
Если вы прочитаете 8 байтов, а затем 4 байта, вы получите два пакета, содержащие сообщения [0,1,2,3] и [4,5].
Обновление: уточнение фиксации
Я не уверен на 100%, как работает коммит, в основном я работал с Kafka из среды Storm. Предоставленный KafkaSpout автоматически фиксирует сообщения Kafka.
Но просматривая 0.9.0.1 Потребительские API, которые я бы порекомендовал вам. Похоже, что к этому обсуждению относятся, в частности, три метода.
- опрос (длительный тайм-аут)
- commitSync ()
- commitSync (смещения java.util.Map)
Метод опроса извлекает сообщения, может быть только 1, может быть 20, например, в вашем примере было возвращено 3 сообщения [0,1,2]. Теперь у вас есть эти три сообщения. Теперь вам решать, как их обрабатывать. Вы можете обработать их 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1, это просто зависит. Как бы вы ни обрабатывали их, после обработки вы захотите выполнить фиксацию, которая сообщает серверу Kafka, что вы закончили с этими сообщениями.
Использование commitSync () фиксирует все, что было возвращено в последнем опросе, в этом случае фиксируются смещения [0,1,2].
С другой стороны, если вы решите использовать commitSync (смещения java.util.Map), вы можете вручную указать, какие смещения фиксировать. Если вы обрабатываете их по порядку, вы можете обработать смещение 0, затем зафиксировать его, обработать смещение 1, затем зафиксировать его, наконец, обработать смещение 2 и зафиксировать.
В общем, Kafka дает вам свободу обрабатывать сообщения так, как вам нужно, вы можете обрабатывать их последовательно или полностью случайным образом по вашему выбору.
person
Morgan Kenyon
schedule
25.02.2016