Реактивные потоки: как дождаться всех издателей по ключу?

Предположим, у меня есть 3 издателя и 1 процессор. Издатели выпускают элементы в форме {key: <integer>, value: <object>, publisher_id: <string>}.

Издатели выполняют операции ввода-вывода, поэтому:

  • С одной стороны, я бы хотел, чтобы издатели работали (примерно) над N предметами в данный момент.
  • С другой стороны, я бы хотел, чтобы потребитель объединял элементы в одну запись (например, {key: <integer>, values: <list>})

На самом деле я уже реализовал FluxProcessor с внутренним хранилищем (ConcurrentHashMap) для хранения всех элементов. Он вручную создает request() новых элементов всякий раз, когда CAPACITY не была достигнута.

Я хотел бы знать, есть ли встроенные функции для этого с RxJava(2)/Spring Reactor API?


person yaseco    schedule 23.06.2019    source источник


Ответы (1)


Используйте merge, rebatchRequests и toMultimap с RxJava 2:

Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...

Flowable.merge(
    source1.rebatchRequests(N),
    source2.rebatchRequests(N),
    source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));
person akarnokd    schedule 27.06.2019
comment
Спасибо акарнокд! - person yaseco; 28.06.2019
comment
Я должен признать, что на самом деле это не то, что я хотел, так как toMultiMap - это раковина, которая заканчивает Flowable - person yaseco; 30.06.2019
comment
Что вы имеете в виду, когда заканчиваете Текучесть. Вы можете преобразовывать типы. - person akarnokd; 01.07.2019
comment
Вы случайно не знаете, что такое эквивалент в Spring Reactor? - person yaseco; 08.07.2019