Добавить трассировку и идентификатор диапазона в задание Flink

У меня есть требование добавить идентификатор отслеживания и диапазона к заданиям Flink, работающим в кластере, запрос проходит примерно так, как показано ниже.

Пользователь -> Rest API -> Kafka-topic-1 -> FlinkJob-1 -> Kafka-topic-2 -> FlinkJob-2 -> Потребитель -> БД

Я использую Spring boot для создания своих API для отдыха и использую Spring Sleuth для добавления идентификатора отслеживания и диапазона в сгенерированные журналы, идентификатор отслеживания и диапазона добавляется при вызове rest API, а также когда сообщение помещается в Kakfa-topic-1 но я не могу понять, как добавить идентификатор трека и диапазона при использовании сообщения в FlinkJob-1 и FLinkJob-2, поскольку они находятся вне контекста Spring.

Один из способов - сделать отслеживание и привязку Id к заголовкам сообщений kafka и иметь перехватчик Kafka Consumer / Producer для извлечения и регистрации идентификатора отслеживания и диапазона, я пробовал это, но мои перехватчики не вызываются, поскольку API-интерфейсы Flink используют версию Kafka-client Flink.

Не удалось вызвать мою настраиваемую схему KafkaDeserializationSchema

public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);

@Override
public TypeInformation<String> getProducedType() {
    System.out.println("************** Invoked 1");
    LOGGER.debug("************** Invoked 1");
    return null;
}

@Override
public boolean isEndOfStream(String nextElement) {
    System.out.println("************** Invoked 2");
    LOGGER.debug("************** Invoked 2");
    return true;
}

@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    System.out.println("************** Invoked 3");
    LOGGER.debug("************** Invoked 3");
    return record.toString();
}

 }

Может ли кто-нибудь предложить мне, как добиться того же.


comment
Не могли бы вы поделиться кодом, в котором вы используете свою схему?   -  person Dominik Wosiński    schedule 27.11.2019
comment
Есть способ, которым мало информации, чтобы помочь вам. Ответ @MlkCode должен работать. Вы используете ровно один раз? Если да, то какие у вас настройки контрольных точек?   -  person Arvid Heise    schedule 28.11.2019
comment
@Apollo Как вы на самом деле используете схему? Как передать его KafkaConsumer Флинка? Пожалуйста, поделитесь своим кодом конвейера. Не уверен, что вы имеете в виду под перехватчиками. Во flink вы создаете свой конвейер вручную. Нет никакой магии инъекций, как это иногда бывает весной.   -  person Dawid Wysakowicz    schedule 29.11.2019


Ответы (2)


Вы также можете использовать KafkaDeserializationSchema, чтобы получить заголовок

Для доступа к ключу, значению и метаданным сообщения Kafka в KafkaDeserializationSchema есть следующий метод десериализации T deserialize (запись ConsumerRecord).

public class Bla implements KafkaDeserializationSchema {
    @Override
    public boolean isEndOfStream(Object dcEvents) {
        return false;
    }

    @Override
    public Object deserialize(ConsumerRecord consumerRecord) throws Exception {
        return null;
    }



    @Override
    public TypeInformation<DCEvents> getProducedType() {
        return null;
    }
person MIkCode    schedule 24.11.2019
comment
Попробуйте выполнить итерацию по consumerRecord.headers (). вы не можете ToString () IT .... - person MIkCode; 25.11.2019
comment
На самом деле ни один из методов этого класса не вызывается, поэтому не могу получить заголовки сообщений, это может быть связано с тем, что я использую библиотеки FlinkKafka. - person Apollo; 26.11.2019

Здесь вы используете простую строку, а для сериализации байта в строку можно сделать что-то вроде приведенного ниже кода.

public class MyDeserializationSchema  implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new String(record.value(), "UTF-8");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
person redhatvicky    schedule 03.12.2019