Создание динамического окна flink путем чтения деталей из kafka

Скажем, сообщения Kafka содержат конфигурацию размера окна flink.

Я хочу прочитать сообщение от Kafka и создать глобальное окно во flink.

Постановка проблемы:

Можем ли мы справиться с описанным выше сценарием с помощью BroadcastStream?

Or

Любой другой подход, который поддержит вышеуказанный случай?


person Avinash Tripathy    schedule 20.01.2020    source источник
comment
Вы пытаетесь изменить размер окна во время выполнения задания или просто инициализировать окно из данных, предоставленных через Kafka?   -  person David Anderson    schedule 20.01.2020
comment
На основе сообщения конфигурации. Он должен быть инициализирован, если он содержит новую информацию или если это та же информация с новым размером окна, тогда он должен изменить размер окна без перезапуска.   -  person Avinash Tripathy    schedule 20.01.2020


Ответы (1)


API окна Flink не поддерживает динамическое изменение размеров окон.

Что вам нужно сделать, так это реализовать собственное оконное управление с помощью функции процесса. В этом случае KeyedBroadcastProcessFunction, в которой транслируется конфигурация окна.

Вы можете изучить обучение Flink. для примера того, как реализовать временные окна с помощью KeyedProcessFunction (скопировано ниже):

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
    // Keyed, managed state, with an entry for each window.
    // There is a separate MapState object for each sensor.
    private MapState<Long, Integer> countInWindow;

    boolean eventTimeProcessing;
    int durationMsec;

    /**
     * Create the KeyedProcessFunction.
     * @param eventTime whether or not to use event time processing
     * @param durationMsec window length
     */
    public PseudoWindow(boolean eventTime, int durationMsec) {
        this.eventTimeProcessing = eventTime;
        this.durationMsec = durationMsec;
    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<Long, Integer> countDesc =
                new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
        countInWindow = getRuntimeContext().getMapState(countDesc);
    }

    @Override
    public void processElement(
            KeyedDataPoint<Double> dataPoint,
            Context ctx,
            Collector<KeyedDataPoint<Integer>> out) throws Exception {

        long endOfWindow = setTimer(dataPoint, ctx.timerService());

        Integer count = countInWindow.get(endOfWindow);
        if (count == null) {
            count = 0;
        }
        count += 1;
        countInWindow.put(endOfWindow, count);
    }

    public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
        long time;

        if (eventTimeProcessing) {
            time = dataPoint.getTimeStampMs();
        } else {
            time = System.currentTimeMillis();
        }
        long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);

        if (eventTimeProcessing) {
            timerService.registerEventTimeTimer(endOfWindow);
        } else {
            timerService.registerProcessingTimeTimer(endOfWindow);
        }
        return endOfWindow;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
        // Get the timestamp for this timer and use it to look up the count for that window
        long ts = context.timestamp();
        KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
        out.collect(result);
        countInWindow.remove(timestamp);
    }
} 

person David Anderson    schedule 20.01.2020