Интеграция Kafka в Apache Heron

Я пытаюсь интегрировать Kafka с топологией Heron. Однако я не могу найти примеры с последней версией Heron (0.17.5). Есть ли какой-либо пример, которым можно поделиться, или какие-либо предложения о том, как реализовать собственный носик Kafka и болт Kafka?

Редактировать 1:

Я считаю, что KafkaSpout и KafkaBolt были намеренно объявлены устаревшими в Heron, чтобы уступить место новому API Streamlet. В настоящее время я пытаюсь выяснить, смогу ли я создать KafkaSource и KafkaSink с помощью Streamlet API. Однако я получаю следующее исключение, когда пытаюсь создать KafkaConsumer в Source.

Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)

Редактировать 2:

Исправлена ​​вышеуказанная проблема. Я инициализировал KafkaConsumer в конструкторе, что было неправильно. Инициализация того же в методе setup() исправила это.


person Daniccan    schedule 29.09.2018    source источник
comment
Если Heron примерно совместим со Storm, какие конкретно проблемы у вас возникают?   -  person OneCricketeer    schedule 29.09.2018
comment
KafkaSpout, предоставляемый Storm, устарел.   -  person Daniccan    schedule 29.09.2018
comment
Согласно чему? Документация дает альтернативу?   -  person OneCricketeer    schedule 30.09.2018
comment
Streamlet API кажется альтернативой Spout и Bolt.   -  person Daniccan    schedule 30.09.2018


Ответы (1)


Мне удалось сделать это с помощью Streamlet API для Heron. Я публикую то же самое здесь. Надеюсь, это поможет другим, столкнувшимся с той же проблемой.

Исходный код Кафки

public class KafkaSource implements Source {

    private String streamName;

    private Consumer<String, String> kafkaConsumer;
    private List<String> kafkaTopic;

    private static final Logger LOGGER = Logger.getLogger("KafkaSource");

    @Override
    public void setup(Context context) {

        this.streamName = context.getStreamName();

        kafkaTopic = Arrays.asList(KafkaProperties.KAFKA_TOPIC);

        Properties props = new Properties();
        props.put("bootstrap.servers", KafkaProperties.BOOTSTRAP_SERVERS);
        props.put("group.id", KafkaProperties.CONSUMER_GROUP_ID);
        props.put("enable.auto.commit", KafkaProperties.ENABLE_AUTO_COMMIT);
        props.put("auto.commit.interval.ms", KafkaProperties.AUTO_COMMIT_INTERVAL_MS);
        props.put("session.timeout.ms", KafkaProperties.SESSION_TIMEOUT);
        props.put("key.deserializer", KafkaProperties.KEY_DESERIALIZER);
        props.put("value.deserializer", KafkaProperties.VALUE_DESERIALIZER);
        props.put("auto.offset.reset", KafkaProperties.AUTO_OFFSET_RESET);
        props.put("max.poll.records", KafkaProperties.MAX_POLL_RECORDS);
        props.put("max.poll.interval.ms", KafkaProperties.MAX_POLL_INTERVAL_MS);

        this.kafkaConsumer = new KafkaConsumer<>(props);

        kafkaConsumer.subscribe(kafkaTopic);
    }

    @Override
    public Collection get() {

        List<String> kafkaRecords = new ArrayList<>();

        ConsumerRecords<String, String> records = kafkaConsumer.poll(Long.MAX_VALUE);

        for (ConsumerRecord<String, String> record : records) {
            String rVal = record.value();
            kafkaRecords.add(rVal);
        }

        return kafkaRecords;
    }

    @Override
    public void cleanup() {
        kafkaConsumer.wakeup();
    }
}
person Daniccan    schedule 30.09.2018
comment
Знаете ли вы, как интегрировать KafkaSpout в Heron, используя Java API, а не Streamlet API? - person Yitian Zhang; 29.10.2018