Предположим, у меня есть 3 издателя и 1 процессор. Издатели выпускают элементы в форме {key: <integer>, value: <object>, publisher_id: <string>}
.
Издатели выполняют операции ввода-вывода, поэтому:
- С одной стороны, я бы хотел, чтобы издатели работали (примерно) над
N
предметами в данный момент. - С другой стороны, я бы хотел, чтобы потребитель объединял элементы в одну запись (например,
{key: <integer>, values: <list>}
)
На самом деле я уже реализовал FluxProcessor
с внутренним хранилищем (ConcurrentHashMap
) для хранения всех элементов. Он вручную создает request()
новых элементов всякий раз, когда CAPACITY не была достигнута.
Я хотел бы знать, есть ли встроенные функции для этого с RxJava(2)/Spring Reactor API?