Мне нужно получить последние данные в шаблоне приема данных nifi

Привет сэр,

В шаблоне приема данных мне нужно получить это свойство, например, у меня есть данные с полем даты

дата дата 12-07-2018 a 13-07-2018 b 14-07-2018 c 15-07-2018 d

В этом случае я бы хотел взять последнюю версию, например, 15-07-2018.

если поле даты получило новые данные 16-07-2018 e, тогда мне нужно получить 16-07-2018, проверив дату последнего обновления 15-07-2018, а не проверяя с первой 12-07-2018

вот так, если я получил 17-08-2108 f, то должен получить 17-08-2018, проверив последнюю новую дату 16-07-2018 ..

как этого добиться, в каком процессоре мне нужно внести изменения или добавить новые свойства? Когда канал запускается снова, как он берет последний водяной знак и работает оттуда


person IMRAN S K    schedule 16.07.2018    source источник
comment
Добро пожаловать в SO! Чтобы мы помогли вам подробно рассказать о том, чего вы пытаетесь достичь, приведите примеры того, что вы пробовали (Форматирование помогает!) и объясните, что вы ожидаете увидеть. Взгляните на Руководство по языку выражений по датам и посмотрите, не возникнет ли это каких-либо идей.   -  person Nathan    schedule 17.07.2018


Ответы (1)


На ум приходят два возможных подхода:

  1. Напишите собственное приложение Spark, которое будет использоваться (ExecuteSparkJob) для чтения загружаемого файла. В этом случае вы отслеживаете максимальную дату, и когда вы закончите прием, сохраните ее где-нибудь. Если вы находитесь в мире HDP, легко было бы вставить максимальную дату в таблицу Hive (транзакционную). Вы также можете использовать znode ZooKeeper для сохранения или даже PutDistributedMapCache процессор, который предлагает NiFi.
  2. Напишите собственный процессор NiFi, который в основном будет делать то же самое, что и предыдущий, за исключением того, что вы должны сами разрешить ему работать с данными в другом формате (CSV, JSON). В этом отношении Spark имеет множество встроенных функций.
person Sivaprasanna Sethuraman    schedule 17.07.2018