Spring websocket отправляет сообщение из нескольких потоков

Я использую реализацию сервера Spring WebSocket для одного из моих проектов на основе Spring. Я столкнулся с ошибкой The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state. Я обнаружил, что проблема заключается в одновременной записи в веб-сокет из разных потоков.

Как я это временно исправил: считайте, что я реализовал метод ниже

void sendMessageToSession(WebsocketSession session,String message);

который отправляет TextMessage в сеанс веб-сокета. Я не могу синхронизировать весь этот метод, потому что несколько потоков могут вызывать его для разных сеансов websocketSessions и сообщений. Я также не могу поместить сеанс в синхронизированный блок (пробовал и не работал)

Хотя я исправил свою проблему вот так

synchronized(session.getId()){ 
    //sending message;
}

и я больше не сталкивался с этой проблемой. Но использование строк в синхронизированных блоках не является хорошей практикой. Итак, какие еще решения у меня есть? как лучше всего отправлять асинхронные сообщения?

PS: я уже использовал ConcurrentWebSocketSessionDecorator после установления соединения и использую обновленный веб-сокет. не помогло.

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

ПРИМЕЧАНИЕ. Я сохраняю сеансы websocet на карте, где ключ - session.getId, а значение - сам сеанс.

В отличие от некоторых других реализаций веб-сокетов, ссылки на веб-сокеты Spring не кажутся равными для каждого сообщения. Я сохранял сеансы на карте по их идентификатору, и в каждом сообщении я проверяю равенство переданного веб-сокета с веб-сокетом, который я уже поместил на свою карту, его ложь.


person Sep GH    schedule 04.02.2018    source источник
comment
Насколько я понимаю, synchronized(session.getId()) не может решить вашу проблему ..   -  person xingbin    schedule 04.02.2018
comment
@ user27149 ну, теперь я не сталкиваюсь с какими-либо исключениями, когда я использую его, и система работает нормально, поэтому я могу сказать, что она действительно решила мою проблему (временно, потому что я задал этот вопрос, чтобы найти правильный способ ее решения)   -  person Sep GH    schedule 04.02.2018
comment
Да, я понимаю, вы ищете лучшее решение ...   -  person xingbin    schedule 04.02.2018
comment
Что происходит, когда вы пытаетесь синхронизироваться (сеанс)?   -  person Warren Dew    schedule 04.02.2018
comment
@WarrenDew все еще та же ошибка. Также обратитесь к моей заметке   -  person Sep GH    schedule 05.02.2018


Ответы (1)


Добавив volatile ключевое слово за моим WebsocketSession в том месте, где я сохраняю свои сеансы, я решил проблему. Я был бы рад узнать, если это тоже плохая практика. Но моя идея заключается в том, что при записи в сеанс веб-сокета из нескольких потоков эти потоки теряют состояние веб-сокета, потому что он еще не обновлен, и поэтому возникает это исключение.

Добавляя volatile, мы гарантируем, что состояние веб-сокета было обновлено до того, как его использует другой поток, поэтому запись в веб-сокет работает синхронизированно, как и ожидалось.

Я создал класс с именем SessionData, который содержит websocketSession и все другие данные, которые мне нужны о сеансе.

public class SessionData {
    private volatile WebSocketSession websocketSession;
    //...other 
    // getters and setters ...
}

и я использовал SessionData как значение карты, где идентификаторы сеанса являются ключами

затем при получении websocketSession из SessionData и записи в него из разных потоков volatile помог мне получить обновленный websocketSession.


Обновление (2020)

Одно ключевое замечание здесь заключается в том, что вы должны использовать sessionData.getWebsocketSession.sendMessage(...) каждый раз, когда хотите отправить сообщение в сеанс. Никогда не следует использовать сеанс напрямую, что означает, что использование кода типа this является плохой практикой:

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);

Вы никогда не узнаете, какие изменения были применены к сеансу websocket между этими двумя строками кода (в вашем случае их может быть больше 2).

Также никогда не публикуйте напрямую в сеансах, которые передаются вам внутри Spring websocket MessageHandlers. В противном случае вы, вероятно, снова получите эту ошибку.

Вот почему рекомендуется отображать sessionId из WebSocketSession в SessionData при открытии соединения. Вы можете использовать этот репозиторий для получения volatile session с использованием идентификатора сеанса вместо прямого использования сеанса.

person Sep GH    schedule 05.02.2018
comment
Вау, спасибо! Это то, что я искал. Есть ли существенная разница между маркировкой volatile всей карты (где идентификаторы сеанса являются ключами) и маркировкой volatile объекта сеанса внутри значения карты (класс SessionData)? - person Nikolai Shevchenko; 21.05.2018
comment
@NikolayShevchenko Я предлагаю вам поставить его за сеансом в классе sessionData. Просто подумав о концепции, я должен сказать, что это означает, что мы получаем доступ к последнему состоянию websocketSession после того, как мы получили его с карты. Но если вы поместите его за карту, вы получите доступ к последнему состоянию карты, которое ничего не гарантирует об объектах внутри карты. Просто убедитесь, что вы используете ConcurrentHashMap. - person Sep GH; 22.05.2018