Набор данных flink 1.7.2 не поддерживает приемник кафка?

Набор данных flink 1.7.2 не поддерживает приемник кафка?

После выполнения пакетной операции мне нужно опубликовать сообщение в kafka, то есть источник - мой postgres, а приемник - моя kafka.

Является ли это возможным ?


person Avinash Tripathy    schedule 20.05.2020    source источник


Ответы (2)


Вы можете создать свой собственный выходной формат и использовать Kafka Producer для вывода в Kafka. Проверьте код ниже.

...
data.output(new KafkaOPFormat());
env.execute();
import java.io.IOException;
import java.util.Properties;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {

  private final Properties properties = new Properties();
  private KafkaProducer<String, String> producer;

  @Override
  public void configure(Configuration configuration) {
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("acks", "all");
    properties.put("retries", 0);
    properties.put("batch.size", 16384);
    properties.put("linger.ms", 1);
    properties.put("buffer.memory", 33554432);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  }

  @Override
  public void open(int i, int i1) throws IOException {
    producer = new KafkaProducer<String, String>(properties);
  }

  @Override
  public void writeRecord(Tuple2<String, String> record) throws IOException {
    producer.send(new ProducerRecord<>(record.f0, record.f1));
  }

  @Override
  public void close() throws IOException {
    producer.close();
  }
}

PS: Я не помню все конфигурации, обязательно проверьте вашу конфигурацию и внесите соответствующие изменения.

person Anurag Anand    schedule 20.05.2020

Из коробки: еще нет, вам придется работать с потоками данных с самого начала или создавать свой собственный настраиваемый формат вывода, как упоминалось.

Однако есть планы унифицировать API DataSet и DataStream в проекте Apache Flink в долгосрочной перспективе в Flink 2.0: https://flink.apache.org/roadmap.html

person Dave Canton    schedule 20.05.2020