Может ли потребитель Kafka (0.8.2.2) читать сообщения в пакетном режиме

Насколько я понимаю, потребитель Kafka последовательно читает сообщения из назначенного раздела ...

Мы планируем иметь несколько потребителей Kafka (Java), которые будут иметь ту же группу, что и я ... поэтому, если он будет читать последовательно из назначенного раздела, то как мы можем достичь высокой пропускной способности ... т.е. Например, производитель публикует сообщения типа 40 в секунду ... Сообщение потребительского процесса 1 в секунду .. хотя у нас может быть несколько потребителей, но не может быть 40 rt ??? Поправьте меня если я ошибаюсь...

И в нашем случае потребитель должен зафиксировать смещение только после того, как сообщение будет успешно обработано .. иначе сообщение будет повторно обработано ... Есть ли лучшее решение ???


person shiv455    schedule 25.02.2016    source источник
comment
Ваш вопрос немного двусмысленен, ваш заголовок, кажется, предполагает, что вы спрашиваете, может ли потребитель Kafka читать в пакетном режиме, но ваш фактический вопрос больше касается того, как сбалансировать определенные требования к обработке в Kafka. Вы хотите знать, могут ли потребители отправлять сообщения в пакетном режиме? Или вы спрашиваете, как обойти ваши требования к производителю (40 в секунду) и потребителю (1 в секунду) по времени / обработке?   -  person Morgan Kenyon    schedule 25.02.2016
comment
я предполагаю, что мой вопрос связан с заголовком ... я попытаюсь объяснить, почему ... если один потребитель kafka читает сообщения в пакете, я могу достичь такой же пропускной способности, что и производитель ... поэтому в моем вопросе я спрашивал, как достичь высокой пропускной способности с каждым потребителем ... это может произойти только в том случае, если каждый потребитель будет читать сообщения партиями ... но, как я понимаю, потребитель kafka читает сообщения последовательно из раздела ...   -  person shiv455    schedule 25.02.2016


Ответы (3)


На основании вашего уточнения вопроса.

Потребитель Kafka может читать несколько сообщений одновременно. Но Kafka Consumer на самом деле не читает сообщения, правильнее сказать, что Consumer читает определенное количество байтов, а затем, исходя из размера отдельных сообщений, определяет, сколько сообщений будет прочитано. При чтении конфигураций потребителя Kafka вы не можете указать, сколько сообщений нужно получать. , вы указываете максимальный / минимальный размер данных, которые может получить потребитель. Сколько бы сообщений ни поместилось в этот диапазон, столько и вы получите. Вы всегда будете получать сообщения последовательно, как вы указали.

Связанные потребительские конфигурации (для версии 0.9.0.0 и выше)

  • fetch.min.bytes
  • max.partition.fetch.bytes

ОБНОВЛЕНИЕ

Используя ваш пример в комментариях, «я понимаю, что если я указываю в config читать 10 байтов, и если каждое сообщение составляет 2 байта, потребитель читает 5 сообщений за раз». Это правда. Ваше следующее утверждение, «это означает, что смещения этих 5 сообщений были случайными с in partition», неверно. Последовательное чтение не означает одно за другим, это просто означает, что они остаются упорядоченными. Вы можете группировать товары и сохранять их последовательными / упорядоченными. Возьмем следующие примеры.

В журнале Kafka, если есть 10 сообщений (каждые 2 байта) со следующими смещениями, [0,1,2,3,4,5,6,7,8,9].

Если вы прочитаете 10 байт, вы получите пакет, содержащий сообщения со смещениями [0,1,2,3,4].

Если вы прочитаете 6 байтов, вы получите пакет, содержащий сообщения со смещениями [0,1,2].

Если вы прочитаете 6 байтов, а затем еще 6 байтов, вы получите два пакета, содержащие сообщения [0,1,2] и [3,4,5].

Если вы прочитаете 8 байтов, а затем 4 байта, вы получите два пакета, содержащие сообщения [0,1,2,3] и [4,5].

Обновление: уточнение фиксации

Я не уверен на 100%, как работает коммит, в основном я работал с Kafka из среды Storm. Предоставленный KafkaSpout автоматически фиксирует сообщения Kafka.

Но просматривая 0.9.0.1 Потребительские API, которые я бы порекомендовал вам. Похоже, что к этому обсуждению относятся, в частности, три метода.

  • опрос (длительный тайм-аут)
  • commitSync ()
  • commitSync (смещения java.util.Map)

Метод опроса извлекает сообщения, может быть только 1, может быть 20, например, в вашем примере было возвращено 3 сообщения [0,1,2]. Теперь у вас есть эти три сообщения. Теперь вам решать, как их обрабатывать. Вы можете обработать их 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1, это просто зависит. Как бы вы ни обрабатывали их, после обработки вы захотите выполнить фиксацию, которая сообщает серверу Kafka, что вы закончили с этими сообщениями.

Использование commitSync () фиксирует все, что было возвращено в последнем опросе, в этом случае фиксируются смещения [0,1,2].

С другой стороны, если вы решите использовать commitSync (смещения java.util.Map), вы можете вручную указать, какие смещения фиксировать. Если вы обрабатываете их по порядку, вы можете обработать смещение 0, затем зафиксировать его, обработать смещение 1, затем зафиксировать его, наконец, обработать смещение 2 и зафиксировать.

В общем, Kafka дает вам свободу обрабатывать сообщения так, как вам нужно, вы можете обрабатывать их последовательно или полностью случайным образом по вашему выбору.

person Morgan Kenyon    schedule 25.02.2016
comment
это меня смущает, когда вы говорите, что потребитель Kafka может читать несколько сообщений за раз, насколько я понимаю, если я укажу в конфигурации, чтобы читать 10 байтов, и если каждое сообщение составляет 2 байта, потребитель читает 5 сообщений за раз ... это означает смещения этих 5 сообщений были случайными с in partition ... ??? и вы также сказали, что вы всегда будете получать сообщения последовательно, как вы указали, это означает, что даже если потребитель сможет получить 5 сообщений, но каждое сообщение приходит к потребителю (потребитель сможет читать) одно за другим? тогда первое утверждение противоречиво. поправьте меня, если я ошибаюсь - person shiv455; 25.02.2016
comment
Обновленный ответ, дайте мне знать, если это поможет. - person Morgan Kenyon; 25.02.2016
comment
Извините за задержку ... хорошо, я понял, что теперь давайте возьмем 3-й пример. Я прочитал 6 байтов, то есть 3 сообщения со смещениями [0,1,2] .. Я получаю эти сообщения в виде массива .. И потребитель должен их обработать последовательно или параллельно ... В любом случае, как зафиксировать смещение (вручную или автоматически), я имею в виду взять 1-е сообщение, которое я прочитал со смещения 0, как только я обработаю его до того числа, которое я должен зафиксировать смещение ... чтобы оно больше не обрабатывалось ???? - person shiv455; 25.02.2016
comment
Обновленный ответ снова. - person Morgan Kenyon; 25.02.2016
comment
Таким образом, я могу обрабатывать все пакетные сообщения, которые потребитель читает за раз, я имею в виду параллельно и вызываю commitSync (), чтобы все сообщения были зафиксированы параллельно ??? - person shiv455; 25.02.2016
comment
Так говорят API, хотя сам я их никогда не использовал. Убедитесь, что вы используете API 0.9.x.x. - person Morgan Kenyon; 25.02.2016
comment
Вы имеете в виду, что только эта версия API имеет только эти 3 метода ... Я думаю, даже в старой версии есть ручная фиксация - person shiv455; 25.02.2016
comment
Да, потребитель был изменен в версии 0.9.0.0, так что до этого все было по-другому. - person Morgan Kenyon; 25.02.2016
comment
Если есть 3 сообщения, каждые 3 байта, вы получите 3 сообщения. Если есть одно 3-байтовое сообщение и одно 8-байтовое сообщение, вы будете использовать только первое сообщение, потому что 3 + 8 = 11 не помещается в 10. - person Marko Bonaci; 26.02.2016
comment
@ morganw09dev все вышеперечисленные функции (пакетное чтение, фиксация смещений в kafka) не могут быть достигнуты, если я использую потребительскую версию 0.8.2.2 ??? я действительно согласен .. пожалуйста, поясните .. - person shiv455; 26.02.2016
comment
Это верно, Kafka переписал своего потребителя для версии 0.9.x, поэтому мой ответ применим только к потребителям версии 0.9.x. - person Morgan Kenyon; 26.02.2016
comment
@ morganw09dev 1) я могу видеть fetch.min.bytes ... в kafka.apache .org / 082 / documentation.html # consumerconfigs, но не max.partition.fetch.bytes, и в моем случае я могу установить только fetch.min.bytes = 10, чтобы за раз потребитель читал 5 сообщений (размер 2 байта) ... ?? 2) вы имеете в виду, что я не могу вручную фиксировать смещения в потребительской версии 0.8.2.2 ??? 3) И какой api (простой или высокоуровневый потребитель) я должен использовать для выполнения моей цели в версии 0.8.2.2? потому что для использования потребителя версии 0.9 мы должны обновить наших корпоративных брокеров, что может не произойти в ближайшем будущем .. пожалуйста, предложите !! - person shiv455; 26.02.2016
comment
Я думаю, тебе придется разобраться в этом самостоятельно. Просмотрите сами api, посмотрите, что вам предлагают и как их можно использовать. Вы ставите себе в затруднительное положение, ожидая, что я или кто-то другой решу все наши проблемы с программированием. - person Morgan Kenyon; 26.02.2016
comment
v0.8 работает точно так же, как описано в @ morganw09dev, в отношении fetch.min.bytes. - person Marko Bonaci; 26.02.2016

Чтобы достичь параллелизма, о котором вы, кажется, и спрашиваете, вы используете разделы по темам (вы разбиваете тему на N частей, которые называются разделами). Затем в потребителе вы создаете несколько потоков для использования из этих разделов.

На стороне производителя вы публикуете сообщения в случайном разделе (по умолчанию) или предоставляете Kafka некоторый атрибут сообщения для вычисления хэша (если требуется упорядочение), что гарантирует, что все сообщения с одинаковым хешем попадают в один и тот же раздел.

ИЗМЕНИТЬ (пример запроса фиксации смещения):
Вот как я это сделал. Все методы, которые не предусмотрены, несущественны.

 /**
   * Commits the provided offset for the current client (i.e. unique topic/partition/clientName combination)
   * 
   * @param offset
   * @return {@code true} or {@code false}, depending on whether commit succeeded
   * @throws Exception
   */
  public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer,
      long offset) throws Exception {
    try {
      TopicAndPartition tap = new TopicAndPartition(topic, partition);
      OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L);
      Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(1);
      mapForCommitOffset.put(tap, offsetMetaAndErr);

      kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(
          ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName,
          ConsumerContext.getOffsetStorageType());

      OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq);
      Short errCode = (Short) offsetCommitResp.errors().get(tap);
      if (errCode != 0) {
        processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host()));
        ErrorMapping.maybeThrowException(errCode);
      }
      LOG.debug("Successfully committed offset [{}].", offset);
    } catch (Exception e) {
      LOG.error("Error while committing offset [" + offset + "].", e);
      throw e;
    }
    return true;
  }
person Marko Bonaci    schedule 25.02.2016
comment
Хорошо для производителя ... какое преимущество я получу, если укажу сообщения, которые будут помещены в конкретный раздел вместо раздела по умолчанию ... в любом случае, как потребитель, я просто передам имя темы ... - person shiv455; 26.02.2016
comment
И другой вопрос 1) в каком формате сообщение хранится в разделе .. все, что я знаю, производитель отправляет сообщение в формате Json в Kafka - person shiv455; 26.02.2016
comment
Первый вопрос: это используется, когда вам нужно убедиться, что сообщения будут использоваться в том же порядке, в котором они были получены / созданы. Допустим, у вас есть, например, PersonTransaction сообщения, вы должны хешировать их personId, чтобы транзакции одного и того же человека всегда попадали в один и тот же раздел. - person Marko Bonaci; 26.02.2016
comment
Второй вопрос: у брокеров сообщения хранятся в двоичном формате. - person Marko Bonaci; 26.02.2016
comment
Для первого примера вопроса, который вы указали, у меня есть аналогичное требование ... Могу я упомянуть список операций (например, 3), которые должны выполняться в одном сообщении, чтобы, если третья операция не удалась, я вернул первые 2 операции и я ' Не буду фиксировать сообщение, поэтому оно будет повторено ... - person shiv455; 26.02.2016
comment
Нет, у нас 0.8.2.2, нельзя ли использовать потребителя низкого / высокого уровня ??? Действительно ли нам нужно переходить на 0.9, потому что для этого также требуется обновление брокеров? - person shiv455; 26.02.2016
comment
вы имеете в виду, что я не могу зафиксировать смещение в kafka, если я использую версию потребителей 0.8.2.2 ??? это действительно удивительно - person shiv455; 26.02.2016
comment
Вам не нужно мигрировать, у вас есть низкоуровневый потребитель в 0.8, где у вас есть полный контроль над тем, когда фиксировать смещения. Таким образом, вы просто убедитесь, что вы сначала делаете все, что вам нужно сделать с сообщением, и только затем фиксируете смещение. - person Marko Bonaci; 26.02.2016
comment
Хорошо, но в потребительском API версии 0.9 уже есть метод commitSync .. которого нет в старых версиях .. как выполнить ручную фиксацию в пользовательских API 0.8.2.X ?? пожалуйста, дайте мне знать, есть ли какой-либо образец - person shiv455; 26.02.2016
comment
Я отредактировал свой ответ, включив в него пример фиксации смещения. Пожалуйста, закройте этот вопрос. - person Marko Bonaci; 26.02.2016
comment
Во время производства я вставляю в раздел по умолчанию ... как мой потребитель узнает, в какой раздел он должен выполнить фиксацию в приведенном выше коде ?? И вот здесь имя клиента ... имя потребителя ?? - person shiv455; 26.02.2016
comment
Мы объединим всех потребителей в одну группу, которую я бы - person shiv455; 26.02.2016
comment
Здесь нет групп, это низкоуровневый потребитель. Я думаю, вам нужно написать код и опубликовать части кода, когда вы застряли (в новом вопросе, где вы разместите ссылку на этот вопрос). - person Marko Bonaci; 28.02.2016

Вы можете использовать сообщения в пакетном режиме и обрабатывать их в пакетном режиме. batch.max.wait.ms (свойство) потребитель будет ждать это время и опрашивает новое сообщение

person charan teja    schedule 27.07.2018