Первые результаты преобразования GroupByKey

Как я могу заставить GroupByKey вызывать ранние результаты, а не ждать прибытия всех данных (что в моем случае довольно долгое время). Я попытался разбить мою входную коллекцию PCollection на окна с помощью раннего триггера, но этого не произошло. т работать. Он по-прежнему ожидает поступления всех данных, прежде чем выдать результаты.

PCollection<List<String>> input = ...
PCollection<KV<Integer,List<String>>> keyedInput = input.apply(ParDo.of(new AddArbitraryKey()))
keyedInput.apply(Window<KV<Integer,List<String>>>into(
          FixedWindows.of(Duration.standardSeconds(1)))
         .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
         .withAllowedLateness(Duration.ZERO).discardingFiredPanes())
 .apply(GroupByKey.<Integer,List<String>>create())
       .apply(ParDo.of(new RemoveArbitraryKey()))
       .apply(ParDo.of(new FurtherProcessing())

Я делаю это, чтобы предотвратить слияние. Преобразование AddArbitraryKey выводит свои элементы с отметкой времени. Однако GroupByKey удерживает все до тех пор, пока не будут получены все данные (для всех окон). Может кто-нибудь, пожалуйста, скажите мне, как я могу заставить его срабатывать раньше. Спасибо .


person Venky    schedule 20.02.2018    source источник
comment
Я только что столкнулся с той же проблемой, простое фиксированное окно не помогает, Combine Per Key ждет до конца ограниченной коллекции. Вы нашли решение?   -  person Maksim Kolchin    schedule 10.08.2018


Ответы (2)


Вы можете установить триггер типа

Repeatedly
  .forever(AfterProcessingTime
    .pastFirstElementInPane()
    .plusDuration(Duration.standardMinutes(1))
  .orFinally(AfterWatermark.pastEndOfWindow())
  .discardingFiredPanes()

Or

AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(
    AfterProcessingTime
      .pastFirstElementInPane()
      .plusDuration(Duration.standardMinutes(1))
person Jiayuan Ma    schedule 20.02.2018
comment
У меня это не сработало. Преобразование GroupBy отправляет результаты только после прибытия всех элементов. Я применил Windowing и триггеры к коллекции PCollection ПЕРЕД преобразованием groupby. Это верно ? Мое намерение состоит в том, чтобы, как только все элементы в пределах временного окна прибудут, GroupBy должен выдать свои результаты для этого окна. Я ошибаюсь? Или не могли бы вы рассказать мне, как я могу этого добиться? - person Venky; 21.02.2018
comment
Вы пробовали .plusDuration(Duration.ZERO)? Кроме того, каков ваш источник данных? Это пакетный источник данных? - person Jiayuan Ma; 21.02.2018
comment
Да, пробовал Duration.ZERO. Да, это пакетный источник данных, а не потоковая передача. Итак, коллекция PCollection ограничена. Будет ли работать в этом случае? - person Venky; 21.02.2018
comment
Пакетный конвейер обычно не требует окон. GBK будет ждать всех данных до выдачи вывода. Если вам по-прежнему требуется потоковая передача (управление окнами и запуском), вам необходимо ввести метку времени события (а не время обработки) для каждого элемента ввода, используя outputWithTimestamp. - person Jiayuan Ma; 21.02.2018
comment
Я использовал outputWithTimestamp внутри своего ParDo следующим образом: processContext.outputWithTimestamp (id, new Instant ()); . Это верно ? Будет ли GBK ждать всех данных, даже если Windows и триггеры используются в ограниченной коллекции PCollection - person Venky; 21.02.2018
comment
new Instant() - время обработки. Вам нужно использовать время события, чтобы окна и триггеры были более значимыми. - person Jiayuan Ma; 22.02.2018

Чтобы предотвратить слияние, лучше использовать преобразование Reshuffle.viaRandomKey(), которое работает лучше и не вносит дополнительных задержек срабатывания.

person jkff    schedule 20.02.2018
comment
Преобразование GroupBy в Reshuffle становится узким местом, т. Е. Ожидает прибытия всех элементов. Я установил временные метки для элементов и применил их к окну (как показано во фрагменте кода в моем вопросе) перед применением Reshuffle.viaRandomKey. Мое намерение состоит в том, чтобы, как только все элементы в пределах временного окна прибудут, GroupBy должен выдать свои результаты для этого окна. Я ошибаюсь? - person Venky; 21.02.2018
comment
PCollection, с которым я работаю, представляет собой ограниченную коллекцию PCollection (пакетного конвейера). Это имеет значение? - person Venky; 21.02.2018
comment
Пакетные конвейеры оптимизируют пропускную способность, а не задержку, и не имеют отслеживания водяных знаков (отчасти потому, что типичные источники пакетных данных, такие как файлы, не имеют очевидного порядка меток времени и не могут предоставить какую-либо полезную оценку водяных знаков), поэтому GroupByKey эффективно буферизует все данные и запускает все окна, когда все данные будут получены. Это проявляется как проблема производительности вашего конвейера? - person jkff; 21.02.2018
comment
Да, в моем сценарии: источником является веб-сервис, который программа потока данных обращается (изнутри ParDo) для получения записей. У этого ParDo есть цикл внутри него, внутри которого он вызывает веб-сервис. Веб-сервис запрашивает базу данных на своем конце и возвращает данные последовательно, но партиями, используя курсор. Следовательно, этот конкретный ParDo работает от одного рабочего. Это имеет большое количество разветвлений, поскольку output () вызывается в цикле для каждой записи, возвращаемой в пакете. Следующий ParDo, который обрабатывает эти записи, не масштабируется в соответствии с количеством записей, выводимых первым ParDo. Слияние ? - person Venky; 22.02.2018
comment
Я подозреваю, что может происходить слияние, которое препятствует масштабированию следующего ParDo, и, следовательно, чтобы сломать его, хотел сгруппировать / разгруппировать. Но если GBK ожидает прибытия всех данных, это предотвращает передачу уже поступивших записей в следующий ParDo. (Веб-сервису требуется много времени для завершения всех своих пакетов, и пока это не произойдет, ни одна из записей не будет обрабатываться дальше) - person Venky; 22.02.2018
comment
Да, в вашем случае слияние - это плохо, но и отсутствие конвейерной обработки между стадиями - тоже плохо. Вы можете попробовать запустить то же задание с помощью обработчика потоковой передачи данных (--streaming = true). Конечно, лучше всего было бы, если бы можно было просто распараллелить цикл внутри исходного ParDo и / или распараллелить запросы к базе данных. - person jkff; 22.02.2018