Добавить пользовательские заголовки в Spring Cloud Stream (с помощью Spring Reactor)

Поскольку я новичок в Spring Reactor, я пытаюсь передавать данные с помощью облачного потока Spring (используя rabbitMQ). Мне нужно добавить несколько настраиваемых заголовков, прежде чем сообщение будет отправлено в очередь.

Конфигурация моего весеннего облака:

spring:
  cloud:
    stream:
      default:
        producer:
          errorChannelEnabled: true
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

      binders:
        rabbitInput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5672
                host: localhost

        rabbitOutput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5670
                host: localhost 

Ссылка на производителя:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {

    public static void main(String[] args) {
        SpringApplication.run(MessageProcessor.class, args);
    }

    @Bean
    Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
        return data -> data.map(d -> match(d, students));

    }
    private String match(String message, List<String> students){
        return Objects.isNull(message) || message.isBlank()
            ? message
            : String.valueOf(matchStudentName(message, students));
    }

    private Optional<String> matchStudentName(String message, List<String> students){
        return students.stream()
        .filter(name -> name.equals(message)).findFirst();
    }
    @Bean
    Function<Flux<String>, Flux<Message<String>>> addHeaders() {
        return data-> data.map(d-> MessageBuilder
            .withPayload( d )
            .setHeader("a", 1)
            .setHeader("b", "999")
            .build());
    }
}

Заголовки успешно добавляются в сообщение, но где-то они переопределяются и не передаются потребителю.

Не мог бы кто-нибудь поделиться своими мыслями о том, как мы можем добавлять пользовательские заголовки в сообщение с помощью Spring Cloud Stream.

Заранее спасибо!


comment
Какую версию spring-cloud-stream вы используете? Подозреваю, что это не 3.х.   -  person Oleg Zhurakousky    schedule 18.02.2020
comment
Я использую версию Spring-Cloud-dependencies Hoxton.M3.   -  person ZeroTPS    schedule 19.02.2020


Ответы (1)


Пожалуйста, обновитесь до Hoxton.SR2, который принесет spring-cloud-stream 3.0.2.RELEASE. Были некоторые обновления, но вкратце сообщение, которое вы создаете, и его заголовок должны быть сохранены.

Дополнительное примечание: Кроме того, из-за добавленной поддержки нескольких аргументов функции ввода / вывода нам пришлось обновить соглашение об именах привязок для функций. Подробнее об этом можно прочитать здесь, но для вас это означает, что ваша конфигурация нуждается в быстром обновлении, поскольку input и output больше не используются по умолчанию, поэтому вам следует использовать имена, производные от имени функции

spring:
  cloud:
    stream:
      bindings:
        processMessageaddHeaders-in-0:
          binder: rabbitInput
          destination: inputDestination
        processMessageaddHeaders-out-0:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

. . . или вы можете сопоставить производные имена привязок с чем-то более описательным (например, input, output и т. д.) и использовать это имя вместо

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders
        bindings: 
          processMessageaddHeaders-in-0: input  
          processMessageaddHeaders-out-0: output


person Oleg Zhurakousky    schedule 19.02.2020
comment
Спасибо за предложение, попробовал, но ничего не вышло. - person ZeroTPS; 19.02.2020
comment
Во время отладки я обнаружил, что внутри BeanFactoryAwareFunctionRegistry.class я вижу, что доходит до точки - ›Object v = value instanceof Message ? ((Message)value).getPayload() : value; - Требуется только полезная нагрузка и отбрасываются настраиваемые заголовки. - person ZeroTPS; 19.02.2020
comment
Я только что протестировал его, и он отлично работает, поэтому я не уверен. Может, вы разместите свой проект на github, и мы посмотрим? Кроме того, кода, который вы указываете на врагов, не существует, поэтому, возможно, вы смотрите на более старые версии. Убедитесь, что вы используете spring-cloud-stream 3.0.2.RELEASE. - person Oleg Zhurakousky; 19.02.2020
comment
Кроме того, я только что это видел, удали @EnableBinding(Processor.class) - person Oleg Zhurakousky; 19.02.2020