Прочтите все записи в таблице и сохраните их в MapState во Flink только один раз.

У меня есть таблица Metadata.

Мне нужно содержимое таблицы в моем приложении Flink. Итак, я хочу прочитать все записи в таблице и сохранить в MapState<Metadata::Id, Metadata>.

Если мое приложение перезапустится, я не хочу читать из таблицы, вместо этого я буду читать из MapState<Metadata::Id, Metadata> и использовать его.

Есть ли способ добиться этого?


person Logic    schedule 22.11.2020    source источник


Ответы (1)


Видео на YouTube и репозиторий github, на которые я ссылался в этом ответе, охватывают ряд похожих сценариев. Но лучший способ загрузить состояние Flink - это предварительно загрузить данные в точку сохранения с помощью тега API обработчика состояний.

Имейте в виду, что MapState Flink - это своего рода состояние с разделением по ключам. Таким образом, если вы используете MapState<Metadata::Id, Metadata>, это фактически Map<KEY, MapState<Metadata::Id, Metadata>>, сегментированный по кластеру с помощью KEY.

Вот пример, показывающий, как создать точку сохранения, содержащую ValueState<Integer>:

public class Bootstrap {
    public static void main( String[] args ) throws Exception {
        ExecutionEnvironment bEnv =
                ExecutionEnvironment.getExecutionEnvironment();

        BootstrapTransformation<Integer> transform =
                OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
                        .keyBy(String::valueOf)
                        .transform(new SimplestTransform());

        Savepoint
                .create(new FsStateBackend("file:///tmp/checkpoints"), 256)
                .withOperator("my-operator-uid", transform)
                .write("file:///tmp/savepoints/");

        bEnv.execute();
    }

    static public class SimplestTransform
            extends KeyedStateBootstrapFunction<String, Integer> {
        ValueState<Integer> state;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> descriptor = new
                    ValueStateDescriptor<>("total", Types.INT);
            state = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Integer value, Context ctx) throws Exception {
            state.update(value);
        }
    }
}

Это создает сегментированную карту ключ / значение, содержащую {"1": 1, "2": 2, "3": 3}.

person David Anderson    schedule 23.11.2020