Получение вложенных полей из сообщения Kafka с помощью Apache Flink SQL

Я пытаюсь создать исходную таблицу с помощью Apache Flink 1.11, где я могу получить доступ к вложенным свойствам в сообщении JSON. Я могу извлекать значения из корневых свойств, но не знаю, как получить доступ к вложенным объектам.

документация предполагает, что это должен быть тип MAP, но когда я его устанавливаю, я получаю следующую ошибку

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

Вот мой SQL

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

А мой JSON выглядит примерно так:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}

person bash721    schedule 23.09.2020    source источник


Ответы (2)


Вы можете использовать ROW для извлечения вложенных полей в сообщениях JSON. Ваш оператор DDL будет выглядеть примерно так:

CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );
person morsapaes    schedule 24.09.2020
comment
Отлично, работает! Как насчет глубоко вложенного JSON, кажется, это может стать громоздким? Есть ли лучшие подходы для работы с данными JSON с использованием PyFlink и SQL? - person bash721; 24.09.2020
comment
На данный момент нет лучшего способа сделать это, AFAIK. Однако в ближайшее время планируется поддержка функций SQL JSON в Flink SQL: cwiki.apache.org/confluence/pages/ Это должно значительно упростить работу! - person morsapaes; 24.09.2020

Вы также можете попробовать

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

Единственная разница: MAP<STRING, STRING> vs MAP

person Yik San Chan    schedule 26.02.2021