Результаты слияния процессора ExecuteSQL с содержимым Json в nifi 6.0

Я имею дело с объектами json, содержащими точки географических координат. Я хотел бы запустить эти точки на сервере postgis, который у меня есть локально, чтобы оценить точку в сопоставлении многоугольников.

Я надеюсь сделать это с уже существующими процессорами - я успешно извлекаю координаты широты и долготы в атрибуты с помощью процессора EvaluateJsonPath и успешно отправляю запросы в мое локальное хранилище данных postgis с помощью ExecuteSQL. Это оставляет мне ответы avro, которые я затем могу преобразовать в JSON с помощью процессора ConvertAvroToJSON.

У меня концептуальные проблемы с тем, как объединить результаты запроса вместе с исходным объектом JSON. Как бы то ни было, у меня есть два потоковых файла с одинаковым идентификатором фрагмента, которые я теоретически мог бы объединить вместе с помощью "mergecontent", но это дает мне:

{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}

Есть ли какие-либо предлагаемые стратегии для объединения результатов SQL-запроса в исходную структуру json, поэтому вместо этого мой результат будет примерно таким:

{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}

Я использую nifi 6.0, postgres 9.5.2 и postgis 2.2.1.

Я видел упоминание об использовании процессора replaceText в https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.html - но это похоже на объединение содержимого из атрибута в тело содержимого. Мне не хватает точки объединения содержимого оригинала и содержимого ответа SQL или атрибутов, извлеченных из ответа SQL без содержимого.

Редактировать:

Похоже, что следующий сценарий Groovy делает то, что необходимо. Я не отличный программист, поэтому любые улучшения приветствуются.

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def originaljsontext = flowFile.getAttribute('original.json')
        def originaljson = slurper.parseText(originaljsontext)
        originaljson.put("point_polygon_info", obj)
        outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

person Josh Harrison    schedule 11.04.2016    source источник
comment
Посмотрите на issues.apache.org/jira/browse/NIFI-361, это должно помочь в стандартном способе выражения преобразований JSON.   -  person andrew    schedule 21.04.2016
comment
Отлично, спасибо @andrew - это будет очень полезно, так как многое из того, что я делаю с nifi, - это трансформация json!   -  person Josh Harrison    schedule 21.04.2016


Ответы (1)


Если ваш исходный JSON относительно невелик, возможный подход может быть следующим ...

  • Используйте ExtractText перед тем, как перейти к ExecuteSQL, чтобы скопировать исходный JSON в атрибут.
  • После ExecuteSQL и после ConvertAvroToJSON используйте процессор ExecuteScript для создания нового документа JSON, который объединяет оригинал из атрибута с результатами в содержимом.

Я не совсем уверен, что нужно сделать в сценарии, но я знаю, что другие успешно использовали Groovy и JsonSlurper через процессор ExecuteScript.

person Bryan Bende    schedule 12.04.2016
comment
После некоторой борьбы с groovy у меня есть кое-что, дающее удовлетворительные результаты - не такое чистое, как можно было бы надеяться, но оно работает :) Скрипт будет отредактирован в моем ответе. - person Josh Harrison; 13.04.2016