Вопросы по теме 'project-reactor'
Терминальные вызовы Stream никогда не выполняются
Мне трудно использовать Spring Reactor Stream API (аналогичный rxjava) для создания объекта ответа в моих ответах на упаковку службы, предоставляемых двумя нижележащими службами.
Ниже приведен метод accept() на моем потребителе канала. Некоторые...
226 просмотров
schedule
27.09.2021
Почему метод runOn () не выполняет оператор карты в следующем доступном потоке из пула, когда текущие выполняющиеся потоки переходят в состояние ожидания?
Я пытаюсь выполнить следующий код на четырехъядерном компьютере. У меня есть 5 потоков в пуле, а внутри оператора карты я на несколько секунд переводил выполняющийся поток в спящий режим.
Я ожидаю, что ядро переведет выполняющийся поток в спящий...
112 просмотров
schedule
29.11.2021
Как создать генератор случайных чисел с помощью Reactor?
Я хочу имитировать данные измерений, многократно выдавая случайные числа. Я попытался сделать это с помощью Reactor, но ничего не вышло:
private static Random random = new Random();
public static void main(String[] args) throws...
1621 просмотров
schedule
21.09.2021
Как убедиться, что поток реактора обрабатывает все предоставленные сообщения
Предположим, мы хотим, чтобы конвейер Flux обрабатывал все сообщения, поступающие из нескольких потоков. Рассмотрим код ниже:
@Test
public void testFluxCreate() throws InterruptedException {
EmitterProcessor<String> processor =...
2889 просмотров
schedule
07.11.2021
Как обеспечить завершение реактивного потока с помощью Spring WebFlux и WebSockets
Я написал тестовый клиент и сервер для Spring WebFlux на Kotlin. Клиент отправляет число на сервер (например, 4) и возвращает это количество чисел (например, 0, 1, 2 и 3). Вот реализация сервера:
class NumbersWebSocketHandler : WebSocketHandler...
1310 просмотров
schedule
18.10.2021
Как мне использовать StepVerifier Reactor, чтобы убедиться, что Mono пуст?
Я использую StepVerifier для проверки значений:
@Test
public void testStuff() {
Thing thing = new Thing();
Mono<Thing> result = Mono.just(thing);
StepVerifier.create(result).consumeNextWith(r -> {
assertEquals(thing,...
13828 просмотров
schedule
11.09.2021
Как я могу выполнить бесконечный опрос Kafka с помощью Reactor?
Каким реактивным способом реализовать бесконечный цикл опроса с помощью Reactor? В идеале я хотел бы отправлять сообщения из приложения-производителя, а приложение-потребитель должно бесконечно прослушивать и обрабатывать поток сообщений одинаково...
303 просмотров
schedule
01.10.2021
Преобразование моно при успехе операций
Я имею дело с Java-клиентом Cloudfoundry для следующего варианта использования:
Я выполняю запрос, который возвращает Mono<Void>
В случае успеха этого Mono я хочу выполнить необязательную операцию, которая возвращает...
536 просмотров
schedule
06.10.2021
Как объединить вложенный поток
Как объединить два потока с вложенным? Почему выполнение этого кода никогда не заканчивается?
@Test
fun `concatenating two flux`() {
val names = listOf("israel", "israel")
val a = Flux.just("a", "v")
.flatMap {...
193 просмотров
schedule
02.12.2021
Как обрабатывать список моно по одному?
У меня есть список как
List<Tuple3<Object1, Mono<List<Object2>>, Mono<List<Object3>>>>
И мне нужно написать метод с подписью как
private Mono<Dto> buildDto(List<Tuple3<Object1,...
235 просмотров
schedule
21.10.2021
Правильный способ последовательного выполнения двух запросов в r2dbc
Я работаю с R2DBC, и мне нужно выполнить запрос, который по запросу возвращает поток моих сущностей, и после этого мне нужно преобразовать эти сущности в DTO, но для создания DTO мне нужно сделать еще один запрос к базе данных для каждого объекта,...
353 просмотров
schedule
18.10.2021
Как мне дождаться, пока несколько моно выполнятся одновременно, и получить значение
Похоже на вопрос Ожидание завершения выполнения экземпляров Reactor Mono но я хочу получить идеальный результат на другом моно. Вот код, который у меня есть. Я попробовал решение с материализацией, но это не сработало....
765 просмотров
schedule
27.11.2021
Зависимые вызовы веб-клиентов - Spring Reactive
Я пытаюсь выполнить два вызова API, второй вызов API зависит от первого ответа API. Следующий фрагмент кода дает ответ на первый вызов веб-клиента. Здесь я не получаю ответа от второго вызова API. В журнале я мог видеть, что запрос на второй вызов...
1940 просмотров
schedule
18.09.2021
Как пропустить старые сообщения при подключении к производителю RabbitMQ
Я изучил политики истечения срока действия и TTL для сообщений и очередей, но не уверен, что это лучший способ выполнить то, что я пытаюсь сделать.
В идеале, когда мой потребитель подключается к моему отправителю, я хочу пропустить все старые,...
146 просмотров
schedule
12.11.2021
Разница между Mono.then и Mono.flatMap / map
Скажем, я хочу вызвать webservice1, а затем вызвать webservice2, если первый был успешным.
Я могу сделать следующее (только ориентировочный псевдокод): -
Mono.just(reqObj)
.flatMap(r -> callServiceA())
.then(() -> callServiceB())
or...
718 просмотров
schedule
23.09.2021
Как передать контекст нижестоящим операторам в Project Reactor?
Мне нужно передать некоторый контекст нижестоящим операторам в Project Reactor, но похоже, что они позволяют делать это только снизу вверх, как описано в документации .
Есть ли способ передать его от восходящих операторов к нисходящим? Я могу...
292 просмотров
schedule
25.11.2021
Reactor: как получить плоский кортеж после использования zipWhen с другим кортежем?
Когда я связываю несколько вызовов zipWhen , результатом будет Tuble2<Tuple2<Foo, Bar>, Bam> вместо Tuple3<Foo, Bar, Bam> . Это ухудшается с каждым последующим zipWhen .
Пример:
val getFoo()
.zipWhen { foo ->...
153 просмотров
schedule
01.11.2021
Слияние двух Mono и получение Flux. Затем извлечение Mono из этого потока
У меня есть два Mono<T> , которые я получил из двух разных источников , скажем KAFKA .
Я намерен объединить оба Mono в Flux<T> . 1
Затем используйте метод public final Mono<T> reduce(BiFunction<T,T,T>...
54 просмотров
schedule
07.10.2021
Как выполнить вычисление движущегося окна для потока и вывести результат в виде нового потока
Я хотел бы выполнить расчет движущегося окна для потока и создать поток, содержащий вычисленные значения, но я не могу понять, как это сделать.
В качестве упрощенного примера, скажем, у меня есть поток целых чисел, и я хочу создать новый поток с...
289 просмотров
schedule
22.02.2022
Для чего нужен интерфейс Fuseable в проекте Reactor?
В исходном коде Reactor много случаев использования интерфейса Fuseable, но я не могу найти никакой ссылки, что это такое. Может ли кто-нибудь объяснить его цель?
278 просмотров
schedule
18.02.2022