цель:
Я хочу загрузить данные потока, затем добавить ключ, а затем посчитать их по ключу.
проблема:
Пиплайн Apache Beam Dataflow получает ошибку памяти, когда я пытаюсь загрузить и сгруппировать по ключу данные большого размера с использованием потокового подхода (неограниченные данные). Потому что кажется, что данные накапливаются в групповом режиме, и он не запускает данные раньше с запуском каждого окна.
Если я уменьшу размер элементов (количество элементов не изменится), все заработает! потому что на самом деле group-by-step ожидает, пока все данные будут сгруппированы, а затем запускает все новые оконные данные.
Я тестировал оба:
версия луча 2.11.0 и версия scio 0.7.4
версия луча 2.6.0 и версия scio 0.6.1
Способ восстановления ошибки:
- Прочтите сообщение Pubsub, содержащее имя файла
- Чтение и загрузка связанного файла из GCS как построчный итератор
- Сглаживайте строку за строкой (так получается около 10 000) элементов.
- Добавить метки времени (текущее мгновенное время) к элементам
- Создайте пару "ключ-значение" моих данных (с некоторыми случайными целочисленными ключами от 1 до 10)
- Применить окно с запуском (оно сработает примерно 50 раз в случае, если строки маленькие и нет проблем с памятью)
- Подсчет на ключ (сгруппируйте по ключам, затем объедините их)
- Наконец, мы предполагали иметь около 50 * 10 элементов, представляющих счетчики по окну и ключу (успешно протестировано, когда размер строк достаточно мал).
Визуализация трубопровода (шаги с 4 по 7):
Резюме для пошагового группирования по клавишам:
Как видите, данные накапливаются в группах и не отправляются.
Код окна здесь:
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
Ошибка:
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Есть ли какое-либо решение для решения проблемы с памятью, возможно, заставив group-by выдавать ранние результаты каждого окна.