Как настроить задания flink во время выполнения?

Можно ли настроить флинк-приложение во время выполнения? Например, у меня есть потоковое приложение, которое считывает ввод, выполняет некоторые преобразования, а затем отфильтровывает все элементы ниже определенного порога. Однако я хочу, чтобы этот порог можно было настраивать во время выполнения, что означает, что я могу изменить его, не перезапуская свое задание flink. Пример кода:

DataStream<MyModel> myModelDataStream = // get input ...
                // do some stuff ...
                .filter(new RichFilterFunction<MyModel>() {
                    @Override
                    public boolean filter(MyModel value) throws Exception {
                        return value.someValue() > someGlobalState.getThreshold();
                    }
                })
                // write to some sink ...

DataStream<MyConfig> myConfigDataStream = // get input ...
                // ...
                .process(new RichProcessFunction<MyConfig>() {
                      someGlobalState.setThreshold(MyConfig.getThreshold());
                })
                // ...

Есть ли возможность этого добиться? Как глобальное состояние, которое можно изменить, например, с помощью потока конфигурации.


person Horrorente    schedule 28.06.2017    source источник


Ответы (1)


Да, вы можете сделать это с помощью BroadcastProcessFunction. Примерно так:

    MapStateDescriptor<Void, Threshold> bcStateDescriptor = new MapStateDescriptor<>(
    "thresholds", Types.VOID, Threshold.class);

    DataStream<MyModel> myModelDataStream = // get input ...
    DataStream<Threshold> thresholds = // get input...
    BroadcastStream<Threshold> controlStream = thresholds.broadcast(bcStateDescriptor);

    DataStream<MyModel> result = myModelDataStream
      .connect(controlStream)
      .process(new MyFunction());

    public class MyFunction extends BroadcastProcessFunction<MyModel, Long, MyModel> {    
        @Override
        public void processBroadcastElement(Threshold newthreshold, Context ctx, Collector<MyModel> out) {
            BroadcastState<Void, Threshold> bcState = ctx.getBroadcastState(new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class));  
            bcState.put(null, newthreshold);
        }

        @Override
        public void processElement(MyModel model, Collector<MyModel> out) {
            Threshold threshold = ctx.getBroadcastState(new MapStateDescriptor<>("threshold", Types.VOID, Threshold.class)).get(null);
            if (threshold.value() == null || model.getData() > threshold.value()) {
                out.collect(model);
            }
        }
    }
person David Anderson    schedule 28.06.2017
comment
на основании того, что вы сказали, могу ли я поместить некоторые переменные в свой RuntimeContext и использовать Rich-функции для доступа к ним? Например, в моем случае у меня есть класс с некоторыми конфигурациями. - person Michieru; 20.09.2019
comment
Пожалуйста, задайте новый вопрос, так как это звучит несвязанно и слишком сложно ответить в комментарии. - person David Anderson; 20.09.2019