Я пытаюсь интегрировать Kafka с топологией Heron. Однако я не могу найти примеры с последней версией Heron (0.17.5). Есть ли какой-либо пример, которым можно поделиться, или какие-либо предложения о том, как реализовать собственный носик Kafka и болт Kafka?
Редактировать 1:
Я считаю, что KafkaSpout и KafkaBolt были намеренно объявлены устаревшими в Heron, чтобы уступить место новому API Streamlet. В настоящее время я пытаюсь выяснить, смогу ли я создать KafkaSource и KafkaSink с помощью Streamlet API. Однако я получаю следующее исключение, когда пытаюсь создать KafkaConsumer в Source.
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)
Редактировать 2:
Исправлена вышеуказанная проблема. Я инициализировал KafkaConsumer
в конструкторе, что было неправильно. Инициализация того же в методе setup()
исправила это.