Я имею дело с объектами json, содержащими точки географических координат. Я хотел бы запустить эти точки на сервере postgis, который у меня есть локально, чтобы оценить точку в сопоставлении многоугольников.
Я надеюсь сделать это с уже существующими процессорами - я успешно извлекаю координаты широты и долготы в атрибуты с помощью процессора EvaluateJsonPath и успешно отправляю запросы в мое локальное хранилище данных postgis с помощью ExecuteSQL. Это оставляет мне ответы avro, которые я затем могу преобразовать в JSON с помощью процессора ConvertAvroToJSON.
У меня концептуальные проблемы с тем, как объединить результаты запроса вместе с исходным объектом JSON. Как бы то ни было, у меня есть два потоковых файла с одинаковым идентификатором фрагмента, которые я теоретически мог бы объединить вместе с помощью "mergecontent", но это дает мне:
{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}
Есть ли какие-либо предлагаемые стратегии для объединения результатов SQL-запроса в исходную структуру json, поэтому вместо этого мой результат будет примерно таким:
{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}
Я использую nifi 6.0, postgres 9.5.2 и postgis 2.2.1.
Я видел упоминание об использовании процессора replaceText в https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.html - но это похоже на объединение содержимого из атрибута в тело содержимого. Мне не хватает точки объединения содержимого оригинала и содержимого ответа SQL или атрибутов, извлеченных из ответа SQL без содержимого.
Редактировать:
Похоже, что следующий сценарий Groovy делает то, что необходимо. Я не отличный программист, поэтому любые улучшения приветствуются.
import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper
def flowFile = session.get();
if (flowFile == null) {
return;
}
def slurper = new JsonSlurper()
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def obj = slurper.parseText(text)
def originaljsontext = flowFile.getAttribute('original.json')
def originaljson = slurper.parseText(originaljsontext)
originaljson.put("point_polygon_info", obj)
outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)