Ошибка канала RSocket: Reaction.core.publisher.Operators.error - Оператор вызвал по умолчанию onErrorDropped с объединенным потоком

Я хочу создать канал rsocket, где данные, отправленные с сервера, могут быть либо реакцией на запрос клиента, либо push. Для этого я использую слияние потоков.

Это ссылочные данные: обновление может быть запрошено клиентом, и сервер также может отправлять обновления.

Итак, у меня есть это на стороне сервера:

    @MessageMapping("update-stream")
    Flux<DomainObject> addUpdatesListener(Flux<RefreshRequest> requests) {
        Flux<DomainObject> pushFlux = Flux.from(this.flux)
            .doOnError((e) -> log.error("Error on push flux : {}", e, e));
        return requests
                .map(this::getUpdates)
                .flatMap(Flux::fromIterable)
                .doOnError((e) -> log.error("Error on channel flux : {}", e, e))
                .mergeWith(pushFlux)
                .doOnError((e) -> log.error("Error on merged flux : {}", e, e));
    }

Он работает, за исключением того, что когда я останавливаю клиента, у меня возникает следующая ошибка:

06-07-2020 15:58:53.168 [reactor-http-nio-3] ERROR reactor.core.publisher.Operators.error - Operator called default onErrorDropped
java.util.concurrent.CancellationException: Disposed
    at reactor.core.publisher.FluxProcessor.dispose(FluxProcessor.java:80)
    at io.rsocket.core.RSocketResponder$3.hookOnCancel(RSocketResponder.java:513)
    at reactor.core.publisher.BaseSubscriber.cancel(BaseSubscriber.java:230)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:275)
    at io.rsocket.core.RSocketResponder.cleanup(RSocketResponder.java:265)
    at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:167)
    at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:160)
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:132)
    at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
    at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
    at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
    at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:60)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1158)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:760)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)  

Если я не сделаю слияние, у меня не будет ошибок.

Я пробовал много разных версий, но не могу найти способ, чтобы при выходе клиента одновременно нажимали и не регистрировали ошибку.

Что мне не хватает?

Спасибо большое.


person Raphaël Lemaire    schedule 06.07.2020    source источник


Ответы (1)


Проблема исчезает при обновлении с spring -boot 2.3.0.RELEASE до 2.3.1.RELEASE.

person Raphaël Lemaire    schedule 06.07.2020