Как связать информацию о происхождении с результатом ExecuteSQL в NiFi?

Я выполняю серию запросов после перечисления таблиц в базе данных, в основном делаю дампы базы данных с добавлением некоторых умов.

Когда данные поступают из ExecuteSql процессора, они имеют формат Avro. Я могу использовать ConvertAvroToJson для преобразования в JSON. Затем я могу отправить этот JSON в другое место. Большой!

Однако мне нужно больше информации, встроенной в этот документ JSON. Я минимально хочу:

  1. имя таблицы, к которой был выполнен запрос,
  2. база данных DSN (без учетных данных, вероятно, жестко запрограммирована в конфигурации, потому что я не думаю, что она доступна из полей языка выражения NiFi),
  3. выполненный запрос, сгенерировавший запись,
  4. номер записи в наборе запроса (общее количество строк уже является атрибутом).
  5. Произвольная информация, полученная из переменных среды или файла конфигурации, в противном случае возвращается к жестко запрограммированной в UpdateAttributes процессоре

ExecuteSQL, похоже, не предоставляет эту информацию, но кажется, что он копирует атрибуты из входного файла потока. Я мог бы поместить часть этой информации во входные атрибуты потокового файла через UpdateAttributes ранее в конвейере. Если это возможно, как я могу объединить вывод JSON из ExecuteSQL -> ConvertAvroToJSON с атрибутами, возможно, выводимый из процессора AttributesToJson?


person Colin Dean    schedule 08.01.2018    source источник
comment
Я действительно стараюсь избегать написания собственных процессоров.   -  person Colin Dean    schedule 09.01.2018


Ответы (1)


Пара комментариев:

1) Вы используете ListDatabaseTables первым? Если это так, у вас будут атрибуты в вашем потоковом файле, включая имя таблицы, имя базы данных и т. Д.

2) Эта информация в настоящее время недоступна для процессора, API DBCPService предоставляет только JDBC Подключение. Возможно, вы могли бы использовать скриптовый процессор, такой как ExecuteScript, для выполнения SQL (у меня есть пример на мой блог) и получите доступ к метаданным, но я не уверен, что там будет вся необходимая информация. Информация о происхождении этого процессора также не будет указана, поскольку мы не сохраняем транзитный URI для события происхождения. Нам следует подумать о том, чтобы сделать хотя бы транзитный URI (минус важные значения) доступным для процессора.

3) Если вы жестко кодируете SQL-запрос в процессоре ExecuteSQL, вы также можете заранее жестко записать его в атрибут с помощью UpdateAttribute. Если файл потока содержит запрос SQL, вы можете использовать ExtractText, чтобы поместить его в атрибут. Мы также должны улучшить процессор, чтобы добавить запрос в качестве атрибута (это должно быть необязательно, поскольку некоторые запросы могут быть довольно большими).

4) Хотите ли вы, чтобы каждая запись разделялась с ее «номером строки» в качестве атрибута? После SplitAvro у вас будет атрибут fragment.index, который будет отсчитываемым от нуля числом, связанным с каждой записью.

5) Вы можете использовать Expression Language в UpdateAttribute, поэтому в сочетании с Реестр переменных вы можете читать значения из переменных среды или файла реестра. Вас также может заинтересовать PropertiesFileLookupService.

person mattyb    schedule 08.01.2018