У меня есть требование добавить идентификатор отслеживания и диапазона к заданиям Flink, работающим в кластере, запрос проходит примерно так, как показано ниже.
Пользователь -> Rest API -> Kafka-topic-1 -> FlinkJob-1 -> Kafka-topic-2 -> FlinkJob-2 -> Потребитель -> БД
Я использую Spring boot для создания своих API для отдыха и использую Spring Sleuth для добавления идентификатора отслеживания и диапазона в сгенерированные журналы, идентификатор отслеживания и диапазона добавляется при вызове rest API, а также когда сообщение помещается в Kakfa-topic-1 но я не могу понять, как добавить идентификатор трека и диапазона при использовании сообщения в FlinkJob-1 и FLinkJob-2, поскольку они находятся вне контекста Spring.
Один из способов - сделать отслеживание и привязку Id к заголовкам сообщений kafka и иметь перехватчик Kafka Consumer / Producer для извлечения и регистрации идентификатора отслеживания и диапазона, я пробовал это, но мои перехватчики не вызываются, поскольку API-интерфейсы Flink используют версию Kafka-client Flink.
Не удалось вызвать мою настраиваемую схему KafkaDeserializationSchema
public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);
@Override
public TypeInformation<String> getProducedType() {
System.out.println("************** Invoked 1");
LOGGER.debug("************** Invoked 1");
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
System.out.println("************** Invoked 2");
LOGGER.debug("************** Invoked 2");
return true;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
System.out.println("************** Invoked 3");
LOGGER.debug("************** Invoked 3");
return record.toString();
}
}
Может ли кто-нибудь предложить мне, как добиться того же.