Как убедиться, что поток реактора обрабатывает все предоставленные сообщения

Предположим, мы хотим, чтобы конвейер Flux обрабатывал все сообщения, поступающие из нескольких потоков. Рассмотрим код ниже:

@Test
public void testFluxCreate() throws InterruptedException {
    EmitterProcessor<String> processor = EmitterProcessor.create();
    CountDownLatch latch = new CountDownLatch(1);

    AtomicLong counter = new AtomicLong();
    AtomicLong batch = new AtomicLong();
    Flux<List<String>> flux = processor
            .doOnSubscribe(ss -> System.out.println(nm() + " : subscribing to + ss))
            .onBackpressureError()
            .buffer(7)
            .publishOn(Schedulers.immediate())
            .doOnNext(it -> {
                counter.addAndGet(it.size());
                System.out.println(batch.incrementAndGet() + " : " + nm() + "Batch: " + it.size());
            })
            ;

    CompletableFuture<Void> producer = CompletableFuture.runAsync(() -> {
        IntStream.range(1, 1001).forEach(it -> {
            //sleep();
            processor.onNext("Message-" + it);
        });
    });

    CompletableFuture<Void> producer2 = CompletableFuture.runAsync(() -> {
        IntStream.range(1, 1001).forEach(it -> {
            //sleep();
            processor.onNext("Message2-" + it);
        });
    });

    CompletableFuture<Void> future = CompletableFuture.allOf(producer, producer2).thenAccept(it -> processor.onComplete());

    flux.doOnComplete(latch::countDown).subscribe();

    future.join();
    latch.await();

    System.out.println("Total: " + counter);
}

Счетчик показывает нам, что каждый раз, когда мы выполняем этот код, фактическое количество обработанных сообщений меняется. Что не так с этой реализацией? Как мы можем гарантировать, что все сообщения были обработаны до завершения программы?


person mgulimonov    schedule 09.11.2017    source источник
comment
Еще не посмотрел должным образом, но .onBackpressureError() намеренно отбрасывает события, когда они не обрабатываются достаточно быстро. Вы в курсе?   -  person Jens Schauder    schedule 09.11.2017


Ответы (1)


Что не так с этой реализацией?

Когда я запускаю код, я сразу после запуска получаю в журналах следующее:

18:39:12.590 [ForkJoinPool.commonPool-worker-1] DEBUG reactor.core.publisher.Operators - Duplicate Subscription has been detected
java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
    at reactor.core.Exceptions.duplicateOnSubscribeException(Exceptions.java:162)
    at reactor.core.publisher.Operators.reportSubscriptionSet(Operators.java:502)
    at reactor.core.publisher.Operators.setOnce(Operators.java:607)
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:245)
    at de.schauder.reactivethreads.demo.StackoverflowQuicky.lambda$null$2(StackoverflowQuicky.java:54)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
    at de.schauder.reactivethreads.demo.StackoverflowQuicky.lambda$main$3(StackoverflowQuicky.java:52)

Я не знаком с EmitterProcessor, но кажется, что onNext не является потокобезопасным, и я сильно подозреваю, что это причина отсутствующих событий.

Как мы можем гарантировать, что все сообщения были обработаны до завершения программы?

Я бы использовал два отдельных Producers и merge. Также я думаю, вам не нужна защелка обратного отсчета.

public static void main(String[] args) {

    AtomicLong counter = new AtomicLong();
    AtomicLong batch = new AtomicLong();

    EmitterProcessor<String> processor1 = EmitterProcessor.create();
    EmitterProcessor<String> processor2 = EmitterProcessor.create();

    Thread thread1 = constructThread(processor1);
    Thread thread2 = constructThread(processor2);


    Flux<List<String>> flux = processor1.mergeWith(processor2)
            .buffer(7)
            .onBackpressureError()
            .publishOn(Schedulers.immediate())
            .doOnNext(it -> {
                counter.addAndGet(it.size());
                System.out.println(batch.incrementAndGet() + " : Batch: " + it.size());
            }).doOnComplete(() -> {
                System.out.println("Total count: " + counter.get());
            });

    thread1.start();
    thread2.start();

    flux.blockLast();
}

private static Thread constructThread(EmitterProcessor<String> processor) {
    return new Thread(() -> {
        IntStream.range(1, 1001).forEach(it -> {
            processor.onNext("Message2-" + it);
        });
        processor.onComplete();
    });
}

Примечание к моему комментарию:

onBackpressureError() заставляет Flux выдавать ошибку, когда подписчик не может обработать все события достаточно быстро, поэтому это может объяснить несоответствие, но вы увидите исключение.

person Jens Schauder    schedule 09.11.2017
comment
Я не вижу никаких исключений, поэтому кажется, что противодавление - это не тот случай. Я проверил ваше предложение о потокобезопасности onNext, и вы были правы! Спасибо! Следующим шагом будет определение, как решить задачу с одним подписчиком и несколькими продюсерами неблокирующим способом =) Конечная цель - понять, как этого добиться с помощью проекта-реактора. - person mgulimonov; 10.11.2017
comment
blockLast существует только для того, чтобы фактически дождаться конца Flux. Так что это действительно необходимо только для того, чтобы приложение не закончилось. Точно так же, как обратный отсчет, но более идиоматично. - person Jens Schauder; 10.11.2017