Я написал простую программу для чтения данных из Kafka и печати во flink. Ниже приведен код.
public static void main(String[] args) throws Exception {
Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
flinkPipelineOptions.setJobName("MyFlinkTest");
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setCheckpointingInterval(1000L);
flinkPipelineOptions.setNumberOfExecutionRetries(5);
flinkPipelineOptions.setExecutionRetryDelay(3000L);
Properties p = new Properties();
p.setProperty("zookeeper.connect", "localhost:2181");
p.setProperty("bootstrap.servers", "localhost:9092");
p.setProperty("group.id", "test");
FlinkKafkaConsumer09<Notification> kafkaConsumer = new FlinkKafkaConsumer09<>("testFlink",new ProtoDeserializer(),p);
DataStream<Notification> input = env.addSource(kafkaConsumer);
input.rebalance().map(new MapFunction<Notification, String>() {
@Override
public String map(Notification value) throws Exception {
return "Kafka and Flink says: " + value.toString();
}
}).print();
env.execute();
}
Мне нужен flink для обработки моих данных в kafka ровно один раз, и у меня есть несколько вопросов о том, как это можно сделать.
- Когда FlinkKafkaConsumer09 фиксирует обработанные смещения в kafka?
- Скажем, в моей теме 10 сообщений, потребитель обрабатывает все 10 сообщений. Когда я останавливаю задание и запускаю его снова, оно начинает обрабатывать случайные сообщения из набора ранее прочитанных сообщений. Мне нужно убедиться, что ни одно из моих сообщений не обрабатывается дважды.
Пожалуйста посоветуй. Цените всю помощь. Спасибо.