Конвейер луча не производит никакого вывода после GroupByKey с оконным режимом, и я получил ошибку памяти

цель:

Я хочу загрузить данные потока, затем добавить ключ, а затем посчитать их по ключу.

проблема:

Пиплайн Apache Beam Dataflow получает ошибку памяти, когда я пытаюсь загрузить и сгруппировать по ключу данные большого размера с использованием потокового подхода (неограниченные данные). Потому что кажется, что данные накапливаются в групповом режиме, и он не запускает данные раньше с запуском каждого окна.

Если я уменьшу размер элементов (количество элементов не изменится), все заработает! потому что на самом деле group-by-step ожидает, пока все данные будут сгруппированы, а затем запускает все новые оконные данные.

Я тестировал оба:

версия луча 2.11.0 и версия scio 0.7.4

версия луча 2.6.0 и версия scio 0.6.1

Способ восстановления ошибки:

  1. Прочтите сообщение Pubsub, содержащее имя файла
  2. Чтение и загрузка связанного файла из GCS как построчный итератор
  3. Сглаживайте строку за строкой (так получается около 10 000) элементов.
  4. Добавить метки времени (текущее мгновенное время) к элементам
  5. Создайте пару "ключ-значение" моих данных (с некоторыми случайными целочисленными ключами от 1 до 10)
  6. Применить окно с запуском (оно сработает примерно 50 раз в случае, если строки маленькие и нет проблем с памятью)
  7. Подсчет на ключ (сгруппируйте по ключам, затем объедините их)
  8. Наконец, мы предполагали иметь около 50 * 10 элементов, представляющих счетчики по окну и ключу (успешно протестировано, когда размер строк достаточно мал).

Визуализация трубопровода (шаги с 4 по 7):

введите здесь описание изображения

Резюме для пошагового группирования по клавишам:

введите здесь описание изображения

Как видите, данные накапливаются в группах и не отправляются.

Код окна здесь:

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)

Ошибка:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

Есть ли какое-либо решение для решения проблемы с памятью, возможно, заставив group-by выдавать ранние результаты каждого окна.


person Saeed Mohtasham    schedule 12.04.2019    source источник
comment
Было бы полезно, если бы вы объяснили, что вы хотите, чтобы произошло с вашими данными, а затем мы могли бы обсудить, почему триггер не выполняет то, что вы ожидаете (трудно рассуждать о том, что вы хотите, чтобы произошло только из определения триггера). Кроме того, как обновляется водяной знак перед AddTimestamps (какой источник вы используете, видите ли вы, что водяной знак продвигается вперед)?   -  person Lukasz Cwik    schedule 12.04.2019
comment
@LukaszCwik спасибо, я объяснил свою цель в посте. Я обнаружил, что водяной знак данных перестал продвигаться, когда дошел до этапа группировки по ключам. если я попробую тот же конвейер с небольшим размером данных, поведение группового шага будет таким же: водяной знак останавливается в группировке примерно на 2 минуты, а затем продвигается (поскольку размер данных в порядке и без ошибок) после того, как все данные присутствуют в по группам (вроде накапливаются данные)   -  person Saeed Mohtasham    schedule 15.04.2019
comment
Основываясь на вашем текущем определении триггера, похоже, что вы пытаетесь выводить данные каждый раз, когда у вас есть не менее 10 элементов ИЛИ после того, как вы видели хотя бы 1 элемент и ждали не менее 1 миллисекунды, и вы хотите прекратить производство вывода после того, как окно находится за концом водяного знака. Это похоже на то, что вы хотите? Кроме того, поскольку ваш водяной знак не продвигается, ваше условие НАКОНЕЦ никогда не будет выполнено, какой источник вы используете (Pubsub / Kafka / ...)?   -  person Lukasz Cwik    schedule 15.04.2019
comment
@LukaszCwik да именно! Запуск происходит, но только тогда, когда все данные сгруппированы в собственное окно и ключ. group-by никогда не выводит частичные результаты уже вычисленных панелей, созданных триггером. Мой ввод - это сообщения PubSub, содержащие имена файлов CSV, а затем я читаю и выравниваю файл, чтобы получить коллекцию строк   -  person Saeed Mohtasham    schedule 15.04.2019
comment
Есть ли в вашем сообщении pubsub атрибут отметки времени, который вы можете использовать для водяных знаков, или вы устанавливаете отметку времени для записей только на основе данных, которые вы прочитали из файла CSV?   -  person Lukasz Cwik    schedule 16.04.2019
comment
@LukaszCwik, я устанавливаю метку времени для записей только на основе данных, которые я прочитал из файла CSV. Я помечаю каждый элемент Collection моментом момента, когда он проходит через шаг Add Timestamp. На самом деле я добавил этот шаг, чтобы убедиться, что временные метки сглаженных данных отличаются. но это не решило проблему.   -  person Saeed Mohtasham    schedule 16.04.2019
comment
AddTimestamp устанавливает только временную метку записи, это не влияет на то, как водяной знак продвигается, поскольку это контролируется источником. Есть ли у вас нижняя граница метки времени записей в каждом файле (например, это файлы журнала, в которых есть данные за некоторый фиксированный интервал времени)? Если это так, вы можете добавить к сообщениям pubsub эту временную метку, а затем сообщить источнику Pubsub эту временную метку, и это заставит водяной знак продвигаться.   -  person Lukasz Cwik    schedule 16.04.2019
comment
Также можете ли вы предоставить идентификатор задания для работающего конвейера? Это может помочь кому-то в Google взглянуть и понять, почему конвейер не выполняет то, что вы ожидаете.   -  person Lukasz Cwik    schedule 16.04.2019


Ответы (1)


KeyCommitTooLargeException - это не проблема памяти, а проблема сериализации protobuf. Protobuf имеет ограничение в 2 ГБ для объекта (максимальный размер google protobuf). Поток данных обнаружил, что значение одного ключа в конвейере превышает 2 ГБ, поэтому он не может перетасовать данные. Сообщение об ошибке указывает, что «Это может быть вызвано группировкой очень большого количества данных в одном окне без использования Combine или созданием большого количества данных из одного элемента ввода». В зависимости от настройки вашего конвейера (т. Е. Назначенных случайных ключей) более вероятно второе.

Конвейер мог прочитать большой файл (> 2 ГБ) из GCS и назначить ему случайный ключ. GroupByKey требует операции перетасовки ключей, а Dataflow не удалось выполнить из-за ограничения protobuf, поэтому застрял на этом ключе и удерживает водяной знак.

Если один ключ имеет большое значение, вы можете уменьшить размер значения, например, сжать строку, разделить строку на несколько ключей или сгенерировать файл GCS меньшего размера.

Если большое значение получено из группировки нескольких ключей, вы можете захотеть увеличить пространство ключей, чтобы каждая группа по ключевым операциям в конечном итоге объединяла меньшее количество ключей.

person Andy Xu    schedule 16.04.2019