создание собственного приемника веб-сокетов для spring-xd

Я новичок в spring-xd, и мне нужен совет по созданию собственного приемника. В частности, я хотел бы создать приемник, который регистрирует Websocket. Общая идея заключается в том, что

module upload --file "websocket-sink-0.0.1-SNAPSHOT.jar" --type "sink" --name "websocket-sink"
stream create --name "websocket-sink-test" --definition "http --port=9191 | websocket-sink --port=9292" --deploy

следует установить модуль приемника и создать поток, который принимает ввод HTTP на порт 9191 и отправляет полезную нагрузку в приемник веб-сокетов, где клиенты могут подключаться (через порт 9292) и использовать эти данные.

Я в значительной степени следовал рекомендациям в документации и использовал учебник по веб-сокетам Spring Integration из технических советов Джоша Лонга (https://github.com/joshlong/techtips/tree/master/examples/spring-integration-4.1-websockets-example). Так вот что я придумал

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.Executors;
import java.util.stream.Collectors;


@Configuration
@EnableIntegration
@ComponentScan
@EnableAutoConfiguration
@RestController
public class WebsocketSink {

    @Bean
    ServerWebSocketContainer serverWebSocketContainer() {
        return new ServerWebSocketContainer("/messages").withSockJs();
    }

    @Bean
    MessageHandler webSocketOutboundAdapter() {
        return new WebSocketOutboundMessageHandler(serverWebSocketContainer());
    }

    @Bean
    MessageChannel input() {
        return new DirectChannel();
    }

    @Bean
    IntegrationFlow webSocketFlow() {
        Function<Message, Object> splitter = m -> serverWebSocketContainer()
                .getSessions()
                .keySet()
                .stream()
                .map(s -> MessageBuilder.fromMessage(m)
                        .setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, s)
                        .build())
                .collect(Collectors.toList());

        return IntegrationFlows
                .from(input())
                .split(Message.class, splitter)
                .channel(c -> c.executor(Executors.newCachedThreadPool()))
                .handle(webSocketOutboundAdapter()).get();
    }

}

я думаю, что некоторые аннотации уровня класса (например, @ComponentScan или @RestController) могут не потребоваться. Но чего мне не хватает (концептуально), так это того, где и как клиенты веб-сокетов будут подключаться к веб-сокету, открытому приемником (то есть, как я могу указать порт, который следует использовать для контейнера, в котором размещен веб-сокет в реализации приемника, например 9292 в определении потока сверху).

Я немного потерялся здесь, поэтому я был бы признателен за любой совет.


person omoser    schedule 24.03.2015    source источник


Ответы (1)


Адаптер Spring Integration WebSocket в техническом совете Джоша должен работать в веб-контейнере (tomcat и т. д.). Так что в настоящее время он не поддерживается в XD из коробки.

Источник http использует настраиваемый адаптер HTTP-канала на основе netty, поэтому он не имеет такой же проблемы (стандартные входящие конечные точки HTTP Spring Integration также должны работать в контейнере).

Не стесняйтесь открывать новую функцию JIRA issue.

person Gary Russell    schedule 25.03.2015
comment
Спасибо Гэри. Дело в веб-контейнере в основном то, что я предполагал. Кажется, уже существует проблема JIRA для xd-native websocket-приемников. Однако, поскольку по этому вопросу нет временных рамок, я был бы рад помочь. Таким образом, концептуально нам нужно запустить встроенный веб-контейнер в реализации приемника и связать ServerWebSocketContainer? Можно ли использовать @EnableAutoConfiguration или другие функции, связанные с Spring Boot, для настройки содержимого веб-контейнера? - person omoser; 25.03.2015
comment
Я бегло просматриваю прототип XD-2398, используя netty; Мне нужно подумать еще, но я подозреваю, что добавление встроенного веб-контейнера в приемник будет нетривиальным - входной канал модуля должен быть каким-то образом подключен к контексту веб-сокета. - person Gary Russell; 25.03.2015
comment
Вы можете взглянуть на тесты. Главной особенностью является @Bean вместо TomcatWebSocketTestServer. Надеюсь, это как-то поможет - person Artem Bilan; 25.03.2015
comment
Спасибо за все Ваши ответы. Я собрал очень простую реализацию приемника, которая использует netty для части веб-сокета: xd-netty-websocket-sink Код в значительной степени основан на примерах netty websocket и определенно нуждается в улучшении. Но он охватывает основной вариант использования, который у меня есть. Предложения очень приветствуются :-) - person omoser; 26.03.2015