Контекст неактивного отслеживания не распространяется на Quarkus при использовании реактивного обмена сообщениями

У меня есть два микросервиса, которые взаимодействуют друг с другом через Kafka: один публикует сообщения, а другой их потребляет. И издатель, и потребитель работают на Quarkus (1.12.0.Final) и используют реактивный обмен сообщениями и Mutiny.

Режиссер:

package myproducer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;


@ApplicationScoped
public class Publisher {  
  @Channel("mytopic")
  @Inject
  public Emitter<MyAvro> myTopic;

  @Override
  public Uni<Void> publish(MyModel model) {
    MyAvro avro = MyModelMapper.INSTANCE.modelToAvro(model);

    return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
                                                .addMetadata(toOutgoingKafkaRecordMetadata(avro))
                                                .withAck(() -> {
                                                     e.complete(null);
                                                     return CompletableFuture.completedFuture(null);
                                                })));
  }
}

Потребитель:

package myconsumer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class Consumer {

  @Incoming("mytopic")
  public Uni<Void> consume(IncomingKafkaRecord<String, MyAvro> message) {
    MyModel model = MyModelMapper.INSTANCE.avroToModel(message.getPayload());

    return ...;
  }

}

Зависимости: включить среди прочего артефакты

  • Quarkus-smallrye-reactive-messaging-kafka
  • кваркус-рестай-бунт
  • Quarkus-smallrye-открывающий
  • кваркус-бунт
  • открывающий-кафка-клиент

Конфигурация Quarkus (application.properties): включает среди прочего

quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n

mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor

При такой настройке никакие traceId или spanId не регистрируются вообще (даже если они должны быть в соответствии с руководством Quarkus по использованию OpenTracing). Только после добавления @ org.eclipse.microprofile.opentracing.Traced устанавливаются traceId и spanId, но оба совершенно не связаны друг с другом на производителе и потребителе.

Я проверил свою открытую конфигурацию по ранее упомянутому руководству Quarkus «Использование OpenTracing», но не обнаружил никаких намеков на неправильную конфигурацию с моей стороны. После прочтения обсуждений проблем в некоторых расширениях Quarkus, использующих ThreadLocals при использовании с Mutiny, я добавил артефакт quarkus-smallrye-context-spreading к своим зависимостям, но безрезультатно.

Я подозреваю, что проблема может быть связана с https://github.com/quarkusio/quarkus/issues/15182, хотя там речь идет о реактивных маршрутах, а не о реактивном обмене сообщениями.

Любые идеи?


person Pat    schedule 04.03.2021    source источник


Ответы (1)


Решить этот вопрос непросто, сначала я постараюсь объяснить, что происходит.

OpenTracing использует концепции транзакций и интервалов. Диапазон - это блок выполнения (метод, вызов базы данных, отправка в тему Kafka), тогда как транзакция - это распределенный процесс, охватывающий несколько компонентов (группа диапазонов).

Проблема здесь в том, что каждый раз, когда создается промежуток, он не находит никакой транзакции OpenTracing, поэтому создает новую. Вот почему ни один из ваших промежутков не коррелирует друг с другом.

В OpenTracing, когда вы создаете диапазон, вы создаете его на основе контекста диапазона. Каждая интеграция OpenTracing создает контекст диапазона на основе расширения технологии (я не нашел лучшего термина), например, контекст диапазона HTTP основан на заголовках HTTP, а контекст диапазона Kafka основан на Kafka. Заголовки.

Итак, чтобы коррелировать два промежутка, вам необходимо создать контекст диапазона с некоторым контекстом из базовой технологии, предоставляющей правильный OpenTracing ID.

Например, чтобы сопоставить два диапазона Kafka, вам необходимо иметь заголовок uber-trace-id (это имя по умолчанию идентификатора OpenTracing в Jaeger) с идентификатором трассировки (см. tracespan-identity для формата этого заголовка).

Зная это, нужно сделать несколько вещей.

Во-первых, вам нужно добавить заголовок uber-trace-id Kafka в исходящее сообщение в методе @Traced, чтобы сопоставить диапазон из метода с диапазоном, созданным внутри перехватчика производителя Kafka.

Tracer tracer = GlobalTracer.get(); // you can also inject it
JaegerSpanContext spanCtx = ((JaegerSpan)tracer.activeSpan()).context();
// uber-trace-id format: {trace-id}:{span-id}:{parent-span-id}:{flags}
//see https://www.jaegertracing.io/docs/1.21/client-libraries/#tracespan-identity
var uberTraceId = spanCtx.getTraceId() + ":" +
        Long.toHexString(spanCtx.getSpanId()) + ":" +
        Long.toHexString(spanCtx.getParentId()) + ":" +
        Integer.toHexString(spanCtx.getFlags());
headers.add("uber-trace-id", openTracingId.getBytes());

Затем вам нужно сопоставить ваш @Traced метод с диапазоном входящего сообщения, если таковой имеется. Для этого проще всего добавить перехватчик CDI, который попытается создать контекст диапазона для всех методов, аннотированных с помощью @Traced, на основе параметров метода (он будет искать параметр Message). Чтобы это работало, этот перехватчик должен быть запущен перед перехватчиком OpenTracing и устанавливает контекст диапазона в контексте перехватчика.

Это наша реализация перехватчика, вы можете использовать ее или адаптировать под свои нужды.

public class KafkaRecordOpenTracingInterceptor {

    @AroundInvoke
    public Object propagateSpanCtx(InvocationContext ctx) throws Exception {
        for (int i = 0 ; i < ctx.getParameters().length ; i++) {
            Object parameter = ctx.getParameters()[i];

            if (parameter instanceof Message) {
                Message message = (Message) parameter;

                Headers headers = message.getMetadata(IncomingKafkaRecordMetadata.class)
                    .map(IncomingKafkaRecordMetadata::getHeaders)
                    .get();
                SpanContext spanContext = getSpanContext(headers);
                ctx.getContextData().put(OpenTracingInterceptor.SPAN_CONTEXT, spanContext);
            }
        }

        return ctx.proceed();
    }

    private SpanContext getSpanContext(Headers headers) {
        return TracingKafkaUtils.extractSpanContext(headers, GlobalTracer.get());
    }
}

Этот код использует как расширение Quarkus OpenTracing, так и библиотеку contrib Kafka OpenTracing.

Как при корреляции исходящего сообщения благодаря добавлению заголовка OpenTracing Kafka, созданного из текущего контекста диапазона, так и при создании контекста из заголовка входящего сообщения, корреляция должна происходить в любом случае.

person loicmathieu    schedule 22.03.2021