Предположим, мы хотим, чтобы конвейер Flux обрабатывал все сообщения, поступающие из нескольких потоков. Рассмотрим код ниже:
@Test
public void testFluxCreate() throws InterruptedException {
EmitterProcessor<String> processor = EmitterProcessor.create();
CountDownLatch latch = new CountDownLatch(1);
AtomicLong counter = new AtomicLong();
AtomicLong batch = new AtomicLong();
Flux<List<String>> flux = processor
.doOnSubscribe(ss -> System.out.println(nm() + " : subscribing to + ss))
.onBackpressureError()
.buffer(7)
.publishOn(Schedulers.immediate())
.doOnNext(it -> {
counter.addAndGet(it.size());
System.out.println(batch.incrementAndGet() + " : " + nm() + "Batch: " + it.size());
})
;
CompletableFuture<Void> producer = CompletableFuture.runAsync(() -> {
IntStream.range(1, 1001).forEach(it -> {
//sleep();
processor.onNext("Message-" + it);
});
});
CompletableFuture<Void> producer2 = CompletableFuture.runAsync(() -> {
IntStream.range(1, 1001).forEach(it -> {
//sleep();
processor.onNext("Message2-" + it);
});
});
CompletableFuture<Void> future = CompletableFuture.allOf(producer, producer2).thenAccept(it -> processor.onComplete());
flux.doOnComplete(latch::countDown).subscribe();
future.join();
latch.await();
System.out.println("Total: " + counter);
}
Счетчик показывает нам, что каждый раз, когда мы выполняем этот код, фактическое количество обработанных сообщений меняется. Что не так с этой реализацией? Как мы можем гарантировать, что все сообщения были обработаны до завершения программы?
.onBackpressureError()
намеренно отбрасывает события, когда они не обрабатываются достаточно быстро. Вы в курсе? - person Jens Schauder   schedule 09.11.2017