Обнаружение пустого окна Flux перед публикацией веб-клиента

Что касается моего предыдущего вопроса Разделение сообщения WebClient потокового потока в массивы JSON, которые я использовал;

myFlux
 .window(5)
 .flatMap(window -> client
  .post()
  .body(window, myClass.class)
  .exchange()
  .flatMap(response -> response.bodyToMono)
 )
 .subscribe();

Это нормально работает. Однако в медленный день доставка 5 сообщений может занять некоторое время, и window не будет отправлять ничего, пока window не заполнится. Поэтому я перешел на windowTimeout(5, Duration.ofSeconds(5)).

Теперь, если данных нет и Duration превышено, код распространяет пустой window, что вызывает отправку пустого массива.

Как мне обнаружить пустой window и не запустить post?


person lafual    schedule 04.06.2019    source источник
comment
ты можешь сделать window.map(value -> client.post()...syncBody(value))? Это заглянет в окно Flux, и если там ничего нет, ничего не произойдет.   -  person Frischling    schedule 05.06.2019


Ответы (1)


К сожалению, невозможно узнать, сколько элементов будет выдано Flux, не прочитав весь Flux до конца.

Поскольку размер вашего окна относительно невелик, вы можете собрать все элементы, испускаемые Flux, в List с помощью .collectList(), а затем проверить, пуст ли список, перед отправкой запроса.

myFlux
    .windowTimeout(5, Duration.ofSeconds(5))
    .flatMap(window ->
        // collect everything in the window into a list
        window.collectList()
             // ignore empty windows
            .filter(list -> !list.isEmpty())
             // send the request
            .flatMap(list -> client
                .post()
                .body(Flux.fromIterable(list), MyClass.class)
                .exchange()
                .flatMap(response -> response.bodyToMono(MyResponse.class))))

person Phil Clay    schedule 05.06.2019
comment
@ Фил снова приходит на помощь! Изменение нанесло серьезный ущерб моему редактору, поэтому я разбил его на filteredFlux = myFlux.windowTimeout().flatMap(w.collectList().filter()), затем filterFlux.flatMap(list -> client.post()) - person lafual; 06.06.2019