Как дождаться завершения запросов GenerateTableFetch

Мой вариант использования такой. У меня есть несколько X-таблиц, которые нужно извлечь из MySQL. Я разделяю их с помощью SplitText, чтобы поместить каждую таблицу в отдельный файл потока и извлекать с помощью GenerateTableFetch и ExecuteSQL.

И я хочу получить уведомление или выполнить какое-либо другое действие, когда импорт будет выполнен для всех таблиц. В текстовом процессоре SplitText я перенаправил original отношение к Wait на ${filename} с целевым количеством ${fragment.count}. Это будет отслеживать, сколько таблиц сделано.

Но теперь я не могу понять, как узнать, когда конкретная таблица сделана. GenerateTableFetch разветвляет файл потока на несколько в зависимости от размера раздела. Но он не записывает такие атрибуты, как fragment.count, которые я могу использовать для ожидания каждой таблицы.

Есть ли способ добиться этого? Или, может быть, есть способ узнать в конце всего потока, были ли обработаны все файлы потока в потоке, и ничего не находится в очереди или не обрабатывается?


person pratpor    schedule 15.09.2018    source источник


Ответы (2)


Если у вас есть автономный экземпляр NiFi (или вы не распространяете потоковые файлы между кластером на узлы ExecuteSQL), вы можете вместо этого использовать QueryDatabaseTable, он (по умолчанию) будет выдавать все потоковые файлы только при обработке всего набора результатов. Если все строки входят в один потоковый файл, то тот факт, что потоковый файл был передан вниз по течению, указывает на то, что выборка завершена.

Я написал NIFI-5601, чтобы улучшить добавление атрибутов fragment.* для потоковой передачи файлов, созданных GTF.

person mattyb    schedule 15.09.2018
comment
Привет, Мэтт. Еще раз спасибо, что отметили это как улучшение Jira. Проблема с QueryDatabaseTable заключается в том, что он не разрешает входящие отношения. И у меня есть 100-1000 таблиц, которые нужно тянуть. - person pratpor; 15.09.2018

Пока NiFi не добавил поддержку этого, мне удалось заставить его работать, используя MergeContent. Используйте table_name как Correlation attribute name, а затем используйте отношение merged к процессору Wait, используя ${merge.count} в качестве цели. Обратитесь к скриншотам, если кто-то хочет сделать то же самое.

введите здесь описание изображения

Обработчик MergeContent

Подождите процессор

person pratpor    schedule 18.09.2018