Как читать тему Kafka построчно в программе Flink

Сначала я загружаю CSV-файл в тему Kafka и могу распечатать тему через программу Flink. Код выглядит следующим образом:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers", 
     "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
    prop.setProperty("group.id", "test");
    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 
     ("flinkTopic", new SimpleStringSchema(),prop);
    myConsumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(myConsumer);
    stream.print();
    env.execute("Flink Streaming Java API Skeleton");

Мой вопрос: я хочу читать тему построчно и обрабатывать каждую строку отдельно, не могли бы вы подсказать мне, как я могу читать тему Kafka построчно?

Любая помощь могла бы быть полезна.


person M_Gh    schedule 24.06.2019    source источник
comment
Можете ли вы привести небольшой пример того, что вы хотите сделать? Насколько я понимаю, вы уже близки к решению. У вас уже есть поток, и вы можете применить к нему операцию типа map или filter.   -  person TobiSH    schedule 24.06.2019
comment
Уважаемый @TobiSH, у меня уже была тема Kafka, полная записей. Я могу распечатать всю тему с помощью приведенного выше кода; но я хочу каждый раз обращаться к одной записи для ее обработки. Подскажите, пожалуйста, как я могу это сделать?   -  person M_Gh    schedule 24.06.2019


Ответы (1)


В качестве примеров того, что вы можете сделать, я рекомендую вам пройти онлайн-курс Apache Flink Training. Вы можете использовать такие операции, как фильтр, карта, плоская карта, Windows и ProcessFunctions, для построчной обработки потока.

Вам может быть интересно, как удобно работать с данными CSV. Самый простой подход - использовать Table / SQL API, у которого есть Kafka Connector и Формат CSV.

Без использования механизма SQL Flink вы можете реализовать функцию карты, которая преобразует каждую строку текста в POJO. Вот пример этого здесь. Или реализуйте свой собственный де / сериализатор, который вы используете вместо SimpleStringSchema.

person David Anderson    schedule 24.06.2019