FlinkKafkaConsumer09 снова и снова читает некоторые сообщения

Я написал простую программу для чтения данных из Kafka и печати во flink. Ниже приведен код.

public static void main(String[] args) throws Exception {

    Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
    env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
    env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);

    flinkPipelineOptions.setJobName("MyFlinkTest");
    flinkPipelineOptions.setStreaming(true);
    flinkPipelineOptions.setCheckpointingInterval(1000L);
    flinkPipelineOptions.setNumberOfExecutionRetries(5);
    flinkPipelineOptions.setExecutionRetryDelay(3000L);

    Properties p = new Properties();
    p.setProperty("zookeeper.connect", "localhost:2181");
    p.setProperty("bootstrap.servers", "localhost:9092");
    p.setProperty("group.id", "test");

    FlinkKafkaConsumer09<Notification> kafkaConsumer = new FlinkKafkaConsumer09<>("testFlink",new ProtoDeserializer(),p);

    DataStream<Notification> input = env.addSource(kafkaConsumer);

    input.rebalance().map(new MapFunction<Notification, String>() {
        @Override
        public String map(Notification value) throws Exception {
            return "Kafka and Flink says: " + value.toString();
        }

    }).print();

    env.execute();
}

Мне нужен flink для обработки моих данных в kafka ровно один раз, и у меня есть несколько вопросов о том, как это можно сделать.

  • Когда FlinkKafkaConsumer09 фиксирует обработанные смещения в kafka?
  • Скажем, в моей теме 10 сообщений, потребитель обрабатывает все 10 сообщений. Когда я останавливаю задание и запускаю его снова, оно начинает обрабатывать случайные сообщения из набора ранее прочитанных сообщений. Мне нужно убедиться, что ни одно из моих сообщений не обрабатывается дважды.

Пожалуйста посоветуй. Цените всю помощь. Спасибо.


person Neoster    schedule 15.07.2016    source источник


Ответы (1)


На этой странице описывается отношение гарантии отказоустойчивости коннектора Flink Kafka.

Вы можете использовать точки сохранения Flink, чтобы перезапустить задание ровно один раз (с сохранением состояния).

Причина, по которой вы снова видите сообщения, заключается в том, что смещения, совершенные Flink для брокера Kafka / Zookeeper, не соответствуют зарегистрированному состоянию Flink. Вы всегда будете видеть сообщения, обработанные несколько раз после восстановления / сбоя в Flink, даже если семантика включена ровно один раз. единовременная гарантия во Flink относятся к зарегистрированному состоянию, а не к записям, отправляемым операторам.


Немного не по теме: для чего эти строчки? На Flink они никуда не передаются.

Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);
flinkPipelineOptions.setJobName("MyFlinkTest");
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setCheckpointingInterval(1000L);
flinkPipelineOptions.setNumberOfExecutionRetries(5);
flinkPipelineOptions.setExecutionRetryDelay(3000L);
person Robert Metzger    schedule 15.07.2016
comment
Спасибо за ответ rmetzger. Извините, flinkPipelineOptions был предоставлен для обработчика потока данных, и я забыл их удалить. Я прочитал руководства, на которые вы указали, но до сих пор не понимаю, как я могу сохранить состояние и возобновить его в следующий раз. Если возможно, мне действительно поможет любой пример. Спасибо и ценю вашу помощь. - person Neoster; 17.07.2016