Поскольку я новичок в Spring Reactor, я пытаюсь передавать данные с помощью облачного потока Spring (используя rabbitMQ). Мне нужно добавить несколько настраиваемых заголовков, прежде чем сообщение будет отправлено в очередь.
Конфигурация моего весеннего облака:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
Ссылка на производителя:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
Заголовки успешно добавляются в сообщение, но где-то они переопределяются и не передаются потребителю.
Не мог бы кто-нибудь поделиться своими мыслями о том, как мы можем добавлять пользовательские заголовки в сообщение с помощью Spring Cloud Stream.
Заранее спасибо!