Akka HTTP - Websocket - истинный двунаправленный сценарий

Попытка создать настоящий двунаправленный сервер websocket с помощью akka http и akka-stream

  • Сервер ответит на запрос, когда ответ будет готов
  • Сервер ответит на запрос несколькими ответами, когда они будут готовы
  • Сервер отправит уведомление без каких-либо запросов

Официальный https://doc.akka.io/docs/akka-http/current/server-side/websocket-support.html#handling-messages не совсем понятен.

Создание маршрута для сервера

  public Route createRoute() {
    return path("subscription", () ->
            get(() ->
                concat(
                    handleWebSocketMessages(subscriptionFlow()))));
  }

  public Flow<Message, Message, NotUsed> subscriptionFlow() {

Я считаю, что вам нужно вернуть поток, который обрабатывал бы входящие / исходящие сообщения. Думая, что для сообщений нет запроса / ответа, а есть только слабое звено, мне понадобится отдельный приемник для запросов и отдельный источник для отправки ответов, хотя приемник все еще должен знать об источнике, чтобы позже он мог знать, кому этот ответ отправлен на адрес.

Я нашел только примеры запроса / ответа или приемник полностью игнорируется и, возможно, один более старый пример https://markatta.com/codemonkey/posts/chat-with-akka-http-websockets-old/

Я думаю использовать Flow.fromSinkAndSourceCoupled и, возможно, иметь актера, который создается для каждого подключения к веб-сокету.

Я не могу реализовать идею Sink.actorRefWithBackpressure, Source.actorRefWithBackpressure, Flow.fromSinkAndSourceCoupled и создания актера для каждого подключения к веб-сокету.

В типизированном актере (в настоящее время 2.6.3) я не могу найти способ создать актера для каждого подключения к веб-сокету, как в старом примере.

val userActor = system.actorOf(Props(new User(chatRoom)))

Есть ли пример в проектах akka / akka-stream / akka-http или где-нибудь еще, где показана эта функция?


person Adrian I    schedule 14.02.2020    source источник
comment
Этот вопрос довольно обширен, и на него нетрудно ответить. Однако Flow достаточно для всех ваших нужд. Flow может выдавать сообщения после асинхронной обработки входных данных. Flow может отправлять сообщения, даже не получая ввода.   -  person Alec    schedule 15.02.2020
comment
Вот обновленная ветка этого старого образца с использованием новых API-интерфейсов Actor из версии 2.6 и последней версии Akka HTTP: https://github.com/johanandren/chat-with-akka-http-websockets/tree/akka-2.6 на случай, если это поможет   -  person johanandren    schedule 18.02.2020
comment
@johanandren безопасно ли использовать контекст актора-хранителя здесь. Я понимаю, что функция будет вызываться при создании нового соединения (вызывается ли newUser () из нескольких потоков? - если да, то ... не будет ли это проблемой, поскольку контекст не является потокобезопасным)   -  person Adrian I    schedule 20.02.2020
comment
Хорошая уловка, это действительно небезопасно и должно быть какое-то взаимодействие сообщения с актером, например, с использованием SpawnProtocol.   -  person johanandren    schedule 20.02.2020
comment
Теперь я внес исправление, используя SpawnProtocol.   -  person johanandren    schedule 20.02.2020
comment
Я думаю, что проект chat-with-akka-http-wesockets - хороший ответ на это :)   -  person Adrian I    schedule 24.02.2020