Вопросы по теме 'project-reactor'

Терминальные вызовы Stream никогда не выполняются
Мне трудно использовать Spring Reactor Stream API (аналогичный rxjava) для создания объекта ответа в моих ответах на упаковку службы, предоставляемых двумя нижележащими службами. Ниже приведен метод accept() на моем потребителе канала. Некоторые...
226 просмотров
schedule 27.09.2021

Почему метод runOn () не выполняет оператор карты в следующем доступном потоке из пула, когда текущие выполняющиеся потоки переходят в состояние ожидания?
Я пытаюсь выполнить следующий код на четырехъядерном компьютере. У меня есть 5 потоков в пуле, а внутри оператора карты я на несколько секунд переводил выполняющийся поток в спящий режим. Я ожидаю, что ядро ​​переведет выполняющийся поток в спящий...
112 просмотров
schedule 29.11.2021

Как создать генератор случайных чисел с помощью Reactor?
Я хочу имитировать данные измерений, многократно выдавая случайные числа. Я попытался сделать это с помощью Reactor, но ничего не вышло: private static Random random = new Random(); public static void main(String[] args) throws...
1621 просмотров

Как убедиться, что поток реактора обрабатывает все предоставленные сообщения
Предположим, мы хотим, чтобы конвейер Flux обрабатывал все сообщения, поступающие из нескольких потоков. Рассмотрим код ниже: @Test public void testFluxCreate() throws InterruptedException { EmitterProcessor<String> processor =...
2889 просмотров

Как обеспечить завершение реактивного потока с помощью Spring WebFlux и WebSockets
Я написал тестовый клиент и сервер для Spring WebFlux на Kotlin. Клиент отправляет число на сервер (например, 4) и возвращает это количество чисел (например, 0, 1, 2 и 3). Вот реализация сервера: class NumbersWebSocketHandler : WebSocketHandler...
1310 просмотров

Как мне использовать StepVerifier Reactor, чтобы убедиться, что Mono пуст?
Я использую StepVerifier для проверки значений: @Test public void testStuff() { Thing thing = new Thing(); Mono<Thing> result = Mono.just(thing); StepVerifier.create(result).consumeNextWith(r -> { assertEquals(thing,...
13828 просмотров

Как я могу выполнить бесконечный опрос Kafka с помощью Reactor?
Каким реактивным способом реализовать бесконечный цикл опроса с помощью Reactor? В идеале я хотел бы отправлять сообщения из приложения-производителя, а приложение-потребитель должно бесконечно прослушивать и обрабатывать поток сообщений одинаково...
303 просмотров
schedule 01.10.2021

Преобразование моно при успехе операций
Я имею дело с Java-клиентом Cloudfoundry для следующего варианта использования: Я выполняю запрос, который возвращает Mono<Void> В случае успеха этого Mono я хочу выполнить необязательную операцию, которая возвращает...
536 просмотров
schedule 06.10.2021

Как объединить вложенный поток
Как объединить два потока с вложенным? Почему выполнение этого кода никогда не заканчивается? @Test fun `concatenating two flux`() { val names = listOf("israel", "israel") val a = Flux.just("a", "v") .flatMap {...
193 просмотров

Как обрабатывать список моно по одному?
У меня есть список как List<Tuple3<Object1, Mono<List<Object2>>, Mono<List<Object3>>>> И мне нужно написать метод с подписью как private Mono<Dto> buildDto(List<Tuple3<Object1,...
235 просмотров

Правильный способ последовательного выполнения двух запросов в r2dbc
Я работаю с R2DBC, и мне нужно выполнить запрос, который по запросу возвращает поток моих сущностей, и после этого мне нужно преобразовать эти сущности в DTO, но для создания DTO мне нужно сделать еще один запрос к базе данных для каждого объекта,...
353 просмотров

Как мне дождаться, пока несколько моно выполнятся одновременно, и получить значение
Похоже на вопрос Ожидание завершения выполнения экземпляров Reactor Mono но я хочу получить идеальный результат на другом моно. Вот код, который у меня есть. Я попробовал решение с материализацией, но это не сработало....
765 просмотров

Зависимые вызовы веб-клиентов - Spring Reactive
Я пытаюсь выполнить два вызова API, второй вызов API зависит от первого ответа API. Следующий фрагмент кода дает ответ на первый вызов веб-клиента. Здесь я не получаю ответа от второго вызова API. В журнале я мог видеть, что запрос на второй вызов...
1940 просмотров

Как пропустить старые сообщения при подключении к производителю RabbitMQ
Я изучил политики истечения срока действия и TTL для сообщений и очередей, но не уверен, что это лучший способ выполнить то, что я пытаюсь сделать. В идеале, когда мой потребитель подключается к моему отправителю, я хочу пропустить все старые,...
146 просмотров

Разница между Mono.then и Mono.flatMap / map
Скажем, я хочу вызвать webservice1, а затем вызвать webservice2, если первый был успешным. Я могу сделать следующее (только ориентировочный псевдокод): - Mono.just(reqObj) .flatMap(r -> callServiceA()) .then(() -> callServiceB()) or...
718 просмотров

Как передать контекст нижестоящим операторам в Project Reactor?
Мне нужно передать некоторый контекст нижестоящим операторам в Project Reactor, но похоже, что они позволяют делать это только снизу вверх, как описано в документации . Есть ли способ передать его от восходящих операторов к нисходящим? Я могу...
292 просмотров
schedule 25.11.2021

Reactor: как получить плоский кортеж после использования zipWhen с другим кортежем?
Когда я связываю несколько вызовов zipWhen , результатом будет Tuble2<Tuple2<Foo, Bar>, Bam> вместо Tuple3<Foo, Bar, Bam> . Это ухудшается с каждым последующим zipWhen . Пример: val getFoo() .zipWhen { foo ->...
153 просмотров

Слияние двух Mono и получение Flux. Затем извлечение Mono из этого потока
У меня есть два Mono<T> , которые я получил из двух разных источников , скажем KAFKA . Я намерен объединить оба Mono в Flux<T> . 1 Затем используйте метод public final Mono<T> reduce(BiFunction<T,T,T>...
54 просмотров

Как выполнить вычисление движущегося окна для потока и вывести результат в виде нового потока
Я хотел бы выполнить расчет движущегося окна для потока и создать поток, содержащий вычисленные значения, но я не могу понять, как это сделать. В качестве упрощенного примера, скажем, у меня есть поток целых чисел, и я хочу создать новый поток с...
289 просмотров
schedule 22.02.2022

Для чего нужен интерфейс Fuseable в проекте Reactor?
В исходном коде Reactor много случаев использования интерфейса Fuseable, но я не могу найти никакой ссылки, что это такое. Может ли кто-нибудь объяснить его цель?
278 просмотров
schedule 18.02.2022