У меня есть два микросервиса, которые взаимодействуют друг с другом через Kafka: один публикует сообщения, а другой их потребляет. И издатель, и потребитель работают на Quarkus (1.12.0.Final) и используют реактивный обмен сообщениями и Mutiny.
Режиссер:
package myproducer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
@ApplicationScoped
public class Publisher {
@Channel("mytopic")
@Inject
public Emitter<MyAvro> myTopic;
@Override
public Uni<Void> publish(MyModel model) {
MyAvro avro = MyModelMapper.INSTANCE.modelToAvro(model);
return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
.addMetadata(toOutgoingKafkaRecordMetadata(avro))
.withAck(() -> {
e.complete(null);
return CompletableFuture.completedFuture(null);
})));
}
}
Потребитель:
package myconsumer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class Consumer {
@Incoming("mytopic")
public Uni<Void> consume(IncomingKafkaRecord<String, MyAvro> message) {
MyModel model = MyModelMapper.INSTANCE.avroToModel(message.getPayload());
return ...;
}
}
Зависимости: включить среди прочего артефакты
- Quarkus-smallrye-reactive-messaging-kafka
- кваркус-рестай-бунт
- Quarkus-smallrye-открывающий
- кваркус-бунт
- открывающий-кафка-клиент
Конфигурация Quarkus (application.properties): включает среди прочего
quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n
mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
При такой настройке никакие traceId или spanId не регистрируются вообще (даже если они должны быть в соответствии с руководством Quarkus по использованию OpenTracing). Только после добавления @ org.eclipse.microprofile.opentracing.Traced устанавливаются traceId и spanId, но оба совершенно не связаны друг с другом на производителе и потребителе.
Я проверил свою открытую конфигурацию по ранее упомянутому руководству Quarkus «Использование OpenTracing», но не обнаружил никаких намеков на неправильную конфигурацию с моей стороны. После прочтения обсуждений проблем в некоторых расширениях Quarkus, использующих ThreadLocals при использовании с Mutiny, я добавил артефакт quarkus-smallrye-context-spreading к своим зависимостям, но безрезультатно.
Я подозреваю, что проблема может быть связана с https://github.com/quarkusio/quarkus/issues/15182, хотя там речь идет о реактивных маршрутах, а не о реактивном обмене сообщениями.
Любые идеи?