Как обновить схему таблицы, когда во Flink есть новая схема Avro для данных Kafka?

Мы используем тему Kafka в приложении Flink, используя Flink Table API.

Когда мы впервые отправляем приложение, мы сначала читаем последнюю схему из нашего настраиваемого реестра. Затем создайте поток данных и таблицу Kafka, используя схему Avro. Реализация моих сериализаторов данных работает аналогично реестру схем Confluent, проверяя идентификатор схемы и затем используя реестр. Таким образом, мы можем применить правильную схему во время выполнения.

Однако я не знаю, как обновить схему таблицы и повторно выполнить SQL без повторного развертывания задания. Есть ли способ создать фоновый поток для проверки изменений схемы, и если они есть, приостанавливает текущее выполнение, обновляет схему таблицы и выполняет SQL.

Это будет особенно полезно для непрерывной доставки изменений схемы в приложения. У нас уже есть проверка на совместимость.


person lalala    schedule 14.11.2020    source источник


Ответы (1)


TL; DR вам не нужно ничего менять, чтобы он работал в большинстве случаев.

В Avro есть концепция схемы читателя и писателя. Схема Writer - это схема, которая использовалась для создания записи Avro и закодирована в полезную нагрузку (в большинстве случаев как идентификатор).

Схема читателя используется вашим приложением для понимания ваших данных. Если вы выполняете конкретный расчет, вы используете определенный набор полей записи Avro.

Теперь о хорошем: Avro прозрачно переводит схему записи в схему чтения, если они совместимы со схемой. Итак, пока ваши схемы полностью совместимы, есть способ всегда преобразовать схему записи в вашу схему чтения.

Поэтому, если ваша схема записей изменяется в фоновом режиме во время работы приложения, DeserializationSchema извлекает новую схему записи и выводит новое сопоставление со схемой чтения. Ваш запрос не заметит никаких изменений.


Этот подход неэффективен, если вы действительно хотите обогатить схему в своем приложении; например, вы всегда хотите добавить поле calculated и вернуть все остальные поля. Тогда новое добавленное поле не будет выбрано, так как схема вашего читателя фактически изменится. В этом случае вам нужно либо перезапустить, либо использовать общую схему записи.

person Arvid Heise    schedule 18.11.2020
comment
Я считаю, что то, что вы объясняете, правильно для DataStream. DeserializationSchema получит новую схему для десериализации событий. Однако мой вопрос касался Table API. При создании Table API требуется статическая схема. Я полагаю, что записи будут десериализованы DeserializationSchema, но новые поля не будут видны пользователям, отправляющим SQL. Я полагаю, SELECT newfield from MyTable потерпит неудачу. - person lalala; 18.11.2020
comment
Table также использует DeserializationSchema, и все действительно для Table API. Однако в вашем примере вы фактически изменили схему читателя. Это возможно только в том случае, если вы повторно зарегистрируете таблицу для обновления схемы считывателя. - person Arvid Heise; 19.11.2020