Flink Kafka EXACTLY_ONCE, вызывающий исключение KafkaException ByteArraySerializer, не является экземпляром Serializer

Итак, я пытаюсь включить семантику EXACTLY_ONCE в моем потоковом задании Flink Kafka вместе с контрольными точками.

Однако я не могу заставить его работать, поэтому я попытался загрузить тестовый образец кода с Github: https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

Так что все работает нормально. Однако при включении контрольной точки возникают ошибки. Или, если я изменю семантику EXACTLY_ONCE на AT_LEAST_ONCE и включу контрольную точку, она будет работать нормально. Но затем, изменив его на EXACTLY_ONCE, я снова получаю эту ошибку.

Исключение, которое я получаю:

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
    ... 12 more

Я внес небольшие изменения в код, чтобы он работал в моей среде. Я запускаю его на игровой площадке флинк-операций внутри докера. (Этот https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-operations-playground.html). Последняя версия, 1.10 и предоставленная кафка внутри, это версия 2.2.1.

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1_000);

        String inputTopic = "my-input";
        String outputTopic = "my-output";
        String kafkaHost = "kafka:9092";

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");


        DataStream<KafkaEvent> input = env
                .addSource(new FlinkKafkaConsumer<>(inputTopic, new KafkaEventSchema(), kafkaProps)
                        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
                .keyBy("word")
                .map(new RollingAdditionMapper());

        input.addSink(
                new FlinkKafkaProducer<>(
                        outputTopic,
                        new KafkaEventSerializationSchema(outputTopic),
                        kafkaProps,
                        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

        env.execute("Modern Kafka Example");
    }

Другие классы из примера можно найти: https://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base

Я попытался изменить сериализацию, чтобы использовать KafkaSerializationSchema, а не пример кода, который использует SerializationSchema. Однако приведенный ниже код тоже не помог. Та же ошибка.

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaEventSerializationSchema implements KafkaSerializationSchema<KafkaEvent> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String topic;

    public KafkaEventSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(KafkaEvent element, Long timestamp) {
        return new ProducerRecord<>(topic, element.toString().getBytes());
    }


}

Любая помощь приветствуется. Мне не удалось найти в Интернете рабочий код EXACTLY_ONCE garantuee между flink и kafka. Загружает только статьи, в которых говорится об этом, но не реальный рабочий код. Это все, что я пытаюсь здесь достичь.


person Wiggy Lindholm    schedule 19.06.2020    source источник
comment
Здесь (stackoverflow.com/a/58644689/2096986) они реализовали свой собственный KafkaSerializationSchema в классе ObjSerializationSchema. Я думаю, это может помочь решить вашу проблему, поскольку это ошибка сериализации.   -  person Felipe    schedule 19.06.2020
comment
KafkaEventSerializationSchema - это та, которую я использую в примере. Который я не могу связать прямо сейчас, кажется, github не работает. В любом случае он также расширяет KafkaSerializationSchema, как вы предлагаете. Я не могу понять, в чем проблема. Если сериализация была проблемой. Почему это работает, когда не используется EXACTLY_ONCE? и не работает при его использовании?   -  person Wiggy Lindholm    schedule 19.06.2020
comment
Итак, KafkaEventSerializationSchema - это созданный вами класс, реализующий KafkaSerializationSchema. В java-документах говорится, что это предостережение Please also implement KafkaContextAware if your serialization schema needs information about the available partitions and the number of parallel subtasks along with the subtask ID on which the Kafka Producer is running. ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/. Может быть, если вы опубликуете свой класс, будет легче определить ошибку.   -  person Felipe    schedule 19.06.2020
comment
Я обновлю пост. Я не могу разместить здесь код.   -  person Wiggy Lindholm    schedule 19.06.2020
comment
Я предполагаю, что это связано с тайм-аутом, когда вы используете EXACTLY_ONCE в соответствии с этим ответом stackoverflow.com/a/58648019/2096986   -  person Felipe    schedule 19.06.2020
comment
Но исключение не об этом. Если transaction.max.timeout.ms - это проблема. На что его нужно установить? если по умолчанию это 15 минут, и я получаю это исключение сразу же при попытке запустить задание. Я не понимаю, как это могло иметь тайм-аут в несколько секунд.   -  person Wiggy Lindholm    schedule 19.06.2020
comment
Я вижу ту же ошибку, но поведение для меня непоследовательно, одно и то же приложение может выйти из строя с этой ошибкой или работать нормально (exatly_once и контрольная точка включены в обоих случаях). В моем тестовом env, когда я включил ровно один раз, у меня сначала возникла ошибка, связанная с transaction.max.timeout.ms, которую я исправил, уменьшив производитель kafka transaction.timeout.ms, но это была другая ошибка).   -  person xrcsblue    schedule 19.06.2020
comment
@WiggyLindholm Удалось ли вам решить эту проблему?   -  person vikash dat    schedule 30.07.2020
comment
@ vikash-dat Не совсем. Я сделал много настроек, но ничего не помогло. Кажется, что исключения случаются все реже и реже, но иногда случаются. Я не знаю почему, и я получил код, предоставленный мне коллегой (которым я не могу поделиться публично), однако, насколько может видеть мой глаз. Практически то же самое, поэтому я не знаю, почему это работает сейчас или почему тогда не работало. Извините. Может быть, если в какой-то момент у меня будет время, я смогу попробовать получить рабочий пример и опубликовать его на github.   -  person Wiggy Lindholm    schedule 30.07.2020


Ответы (1)


Я столкнулся с той же проблемой, и явная установка тайм-аута для производителя помогла. properties.setProperty("transaction.timeout.ms", "900000");

person Ivan Letenko    schedule 22.06.2020