Почему информация трассировки не распространяется по сообщениям kafka, когда Spring Sleuth находится в пути к классам?

Информация трассировки не распространяется по сообщениям kafka из-за того, что метод SleuthKafkaAspect.wrapProducerFactory () не запускается. На стороне производителя сообщение отправляется правильно, и информация трассировки правильно регистрируется. На стороне потребителя вместо этого создаются новые traceId и spanId.

Следующие две строки журнала показывают разные значения для traceId, spanId (и parentId):

2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO  my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'

Сначала, используя Krafdrop, а также отладку, я убедился, что заголовок сообщения не содержит никакой информации для отслеживания.

После этого я понял, что метод SleuthKafkaAspect.wrapProducerFactory () никогда не запускается, вместо этого на стороне потребителя есть метод SleuthKafkaAspect.anyConsumerFactory ().

Используются следующие версии библиотек:

  • пружинный пыльник: 2.3.7.RELEASE
  • весенняя облачная бомба: Hoxton.SR10
  • весеннее облако: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весенняя кафка: 2.5.10.RELEASE
  • клиент какфа: 2.4.1
  • весеннее облако-стартер-сыщик: 2.2.7.RELEASE
  • весна-облако-сыщик-зипкин: 2.2.7.RELEASE

Версия клиентской библиотеки kakfa - 2.4.1 из-за понижения версии, связанной с производственной ошибкой в ​​версии 2.5.1 клиента kafka, которая увеличивает использование процессора. Я также безуспешно пытался использовать следующие комбинации версий библиотек:

  • spring boot: 2.3.7.RELEASE
  • весенняя облачная бомба: Hoxton.SR10 (и Hoxton.SR8)
  • весеннее облако: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весенняя кафка: 2.5.10.RELEASE
  • клиент какфа: 2.5.1
  • весеннее облако-стартер-сыщик: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весна-облако-сыщик-зипкин: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • spring boot: 2.3.7.RELEASE
  • весенняя облачная бомба: Hoxton.SR10 (и Hoxton.SR8)
  • весеннее облако: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весенняя кафка: 2.5.10.RELEASE
  • клиент какфа: 2.6.0
  • весеннее облако-стартер-сыщик: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весна-облако-сыщик-зипкин: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • spring boot: 2.3.7.RELEASE
  • весенняя облачная бомба: Hoxton.SR10 (и Hoxton.SR8)
  • весеннее облако: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весенняя кафка: 2.6.x
  • клиент какфа: 2.6.0
  • весеннее облако-стартер-сыщик: 2.2.7.RELEASE (и 2.2.5.RELEASE)
  • весна-облако-сыщик-зипкин: 2.2.7.RELEASE (и 2.2.5.RELEASE)

Мы перенесли наш проект на другую версию весенней загрузки, с 2.3.0.RELEASE на 2.3.7.RELEASE. Раньше все работало правильно. Ниже старых версий библиотек:

  • весенняя загрузка: 2.3.0.RELEASE
  • spring-kafka: 2.5.0.RELEASE
  • кафка-клиенты: 2.4.1
  • весеннее облако: 2.2.5.RELEASE
  • весна-облако-стартер-сыщик: 2.2.5.RELEASE
  • весна-облако-сыщик-зипкин: 2.2.5.RELEASE

Мы также представили log42 / log4j (до этого был slf4j с логбэком).

Ниже связанных библиотек:

- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test

Настроены следующие свойства:

spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Класс конфигурации для создания ProducerFactory следующий:


@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {

    KafkaProperties kafkaProperties;

    @Autowired
    public KafkaProducerConfig(
            KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }


    private ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfigs());
        //defaultKafkaProducerFactory.transactionCapable();
        //defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
        return defaultKafkaProducerFactory;
    }

    private Map<String, Object> producerConfigs() {

        Map<String, Object> configs = kafkaProperties.buildProducerProperties();
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configs;
    }

}

Мой класс весеннего загрузочного приложения:


@Profile("DEV")
@SpringBootApplication(
        scanBasePackages = {"my.company"},
        exclude = {
                DataSourceAutoConfiguration.class,
                DataSourceTransactionManagerAutoConfiguration.class,
                HibernateJpaAutoConfiguration.class
        }
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
        "my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {

    public static void main(String[] args) {
        SpringApplication.run(DevAppStartup.class, args);
    }

}

Здесь вы можете найти вывод команды mvn dependency: tree mvn_dependency_tree.txt


person user1800752    schedule 23.03.2021    source источник
comment
перекрестная публикация: github.com/spring-cloud/spring-cloud- сыщик / вопросы / 1886   -  person Jonatan Ivanov    schedule 25.03.2021


Ответы (1)


Как указано в документации, вам необходимо создать ProducerFactory bean, если вы хотите использовать свой собственный KafkaTemplate:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}
person Jonatan Ivanov    schedule 31.03.2021