Я нахожусь в среде, где rabbitmq уже предоставлены как заданная инфраструктура.
Служба A выполняет запись в очереди кроликов, а Служба B читает из очередей. Часть чтения в качестве потребителя через Spring Cloud Stream Binder работает как шарм после интенсивного чтения документация и правильно его настроить.
Однако я не могу настроить производителя, который будет записывать данные в очереди кроликов.
Настройка
RabbitMq (уже запущен)
- 1 Обмен: myExchange
- 3 очереди: myQueueA, myQueueB, myQueueC (ссылка на myExchange)
Продюсер-Сервис (вышеупомянутая Услуга А)
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(RabbitChannelSource.class)
public class RabbitSender
{
private final RabbitChannelSource rabbitChannelSource;
public void sendMessage()
{
Message<String> msg = MessageBuilder.withPayload("TEEEEST").build();
rabbitChannelSource.myOutput().send(msg);
}
public interface RabbitChannelSource
{
String MY_OUTPUT_BINDING = "my-output";
@Output(MY_OUTPUT_BINDING)
MessageChannel myOutput();
}
Я пытаюсь заставить его работать сначала для одной очереди, но в идеале я бы правильно установил свойства для всех трех очередей.
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=pass
## use existing rabbitmq via bindings
spring.cloud.stream.bindings.my-output.destination=my-output
#spring.cloud.stream.bindings.my-output.group=myQueueA
spring.cloud.stream.bindings.my-output.producer.required-groups=myQueueA
spring.cloud.stream.rabbit.bindings.my-output.producer.bind-queue=false
spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.queueNameGroupOnly=true
bind-queue=false
и declare-exchange=false
необходимы, так как у меня есть кроличья инфраструктура.
Но, как бы то ни было, я всегда получаю одно и то же исключение, я не мог понять, почему. Я имею в виду, что знаю почему, потому что нет подходящего канала для отправки сообщений. Так что я подозреваю, что это как-то связано с application.properties
.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.my-output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[7], headers={contentType=application/json, id=98698b4c-61fa-596d-736e-f630d3ba4626, timestamp=1605105272723}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at de.techem.emsreceiver.rabbitmq.RabbitSender.sendMessage(RabbitSender.java:25)
at de.techem.emsreceiver.event.TriggeredEmsImport.execute(TriggeredImport.java:95)
at de.techem.emsreceiver.event.TriggeredEmsImport$$FastClassBySpringCGLIB$$d84f31a4.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at de.techem.emsreceiver.event.TriggeredEmsImport$$EnhancerBySpringCGLIB$$a0878aed.execute(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)
Я хочу иметь возможность писать в myQueueA, myQueueB or myQueueC
на основе routingKey. Поэтому, если я установил routingKey внутри своего приложения на myQA
, сообщение должно быть отправлено на myQueueA
, для myQB
затем на myQueueB
, а для myQC
затем на myQueueC
.
Таким образом, три разных ключа маршрутизации приводят к соответствующей очереди кроликов.
Я рад любому вкладу, так как я пробовал много вещей, которые не привели к успеху. Спасибо!
testSending()
? Я не вижу такого метода. - person Gary Russell   schedule 11.11.2020sendMessage()
, я его только что переименовал. Извините, это моя ошибка! - person anjey   schedule 12.11.2020