Я новичок в spring-xd, и мне нужен совет по созданию собственного приемника. В частности, я хотел бы создать приемник, который регистрирует Websocket. Общая идея заключается в том, что
module upload --file "websocket-sink-0.0.1-SNAPSHOT.jar" --type "sink" --name "websocket-sink"
stream create --name "websocket-sink-test" --definition "http --port=9191 | websocket-sink --port=9292" --deploy
следует установить модуль приемника и создать поток, который принимает ввод HTTP на порт 9191
и отправляет полезную нагрузку в приемник веб-сокетов, где клиенты могут подключаться (через порт 9292
) и использовать эти данные.
Я в значительной степени следовал рекомендациям в документации и использовал учебник по веб-сокетам Spring Integration из технических советов Джоша Лонга (https://github.com/joshlong/techtips/tree/master/examples/spring-integration-4.1-websockets-example). Так вот что я придумал
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Configuration
@EnableIntegration
@ComponentScan
@EnableAutoConfiguration
@RestController
public class WebsocketSink {
@Bean
ServerWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/messages").withSockJs();
}
@Bean
MessageHandler webSocketOutboundAdapter() {
return new WebSocketOutboundMessageHandler(serverWebSocketContainer());
}
@Bean
MessageChannel input() {
return new DirectChannel();
}
@Bean
IntegrationFlow webSocketFlow() {
Function<Message, Object> splitter = m -> serverWebSocketContainer()
.getSessions()
.keySet()
.stream()
.map(s -> MessageBuilder.fromMessage(m)
.setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, s)
.build())
.collect(Collectors.toList());
return IntegrationFlows
.from(input())
.split(Message.class, splitter)
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(webSocketOutboundAdapter()).get();
}
}
я думаю, что некоторые аннотации уровня класса (например, @ComponentScan
или @RestController
) могут не потребоваться. Но чего мне не хватает (концептуально), так это того, где и как клиенты веб-сокетов будут подключаться к веб-сокету, открытому приемником (то есть, как я могу указать порт, который следует использовать для контейнера, в котором размещен веб-сокет в реализации приемника, например 9292
в определении потока сверху).
Я немного потерялся здесь, поэтому я был бы признателен за любой совет.