Получение и освобождение подключений с помощью пула подключений TcpClient (Reactor Netty)

Я настроил TcpClient, как описано в примерах, здесь. Я пытаюсь сделать следующий код устойчивым в ситуациях, когда сервер неожиданно закрывает соединение:

TcpClient tcpClient = getTcpClient();

public Mono<String> sendMessage(Mono<bytes[]> request) {
    Connection connection = getConnectionFromPool(tcpClient);
    return connection
        .outbound()
        .sendByteArray(request)
        .then()
        .then(connection.inbound().receive().asString().as(Mono::from));
}

В таком случае я ожидаю, что метод getConnectionFromPool сможет получить соединение из пула или открыть новое, если оно недоступно.

Заметив, что .connect () в конечном итоге переключается на ConnectionProvider.acquire (), я попытался использовать tcpClient.connect (), но возникла необходимость изменить тип возвращаемого значения метода следующим образом:

public Mono<Mono<String>> sendMessage1(Mono<String> request) {
    return this.tcpClient
        .connect()
        .map(connection ->
            connection
                .outbound()
                .sendByteArray(request.map(String::getBytes))
                .then()
                .then(connection.inbound().receive().asString().as(Mono::from))
            );
}

Ясно, что это нежелательно. Как получить экземпляр подключения непосредственно из пула? Есть ли простой оператор Mono, который мне не хватает, или я неправильно использую API TcpClient?

Большое спасибо за любую помощь.


person Francisco Z.    schedule 30.06.2020    source источник


Ответы (1)


А как насчет использования flatMap вместо map?

public Mono<String> sendMessage1(Mono<String> request) {
    return this.tcpClient
            .connect()
            .flatMap(connection ->
                    connection
                            .outbound()
                            .sendByteArray(request.map(String::getBytes))
                            .then()
                            .then(connection.inbound().receive().asString().as(Mono::from))
            );
}
person Violeta Georgieva    schedule 01.07.2020
comment
Большое спасибо, это решает мою проблему с плохим типом возврата :) Ошибка новичка. Я продвинулся в своем тестировании, но все еще сталкиваюсь с одной проблемой. Опять же, моя цель - иметь возможность получать и повторно использовать подключения из пула. В настоящее время с пулом, содержащим 1 соединение, я не могу использовать соединение более одного раза. Ничто в журналах прослушивания не указывает на то, что соединение закрывается. Я неправильно освобождаю соединение с пулом? Есть ли способ освободить соединение с пулом без закрытия соединения? В очередной раз благодарим за помощь. - person Francisco Z.; 01.07.2020