Проверить собственный оператор Mutiny

Я новичок в реактивном программировании и создал свой первый собственный оператор Mutiny, как описано в https://smallrye.io/smallrye-mutiny/guides/plug.

Мой оператор - сборщик, который собирает x элементов в списке и отправляет их, если x достигается или когда поток завершен.

public class ListCollectorUntil<T> extends AbstractMultiOperator<T, List<T>> {

ListCollectorUntil(Multi<? extends T> upstream) {
    super(upstream);
}

public ListCollectorUntil(Multi<? extends T> upstream, int count) {
    this(upstream);
    this.count = count;
}

private int count;
private final List<T> bufferList = new ArrayList<>();
private final List<T> tempBufferList = new ArrayList<>();

@Override
public void subscribe(MultiSubscriber<? super List<T>> downstream) {
    upstream.subscribe().withSubscriber(new ListCollectorProcessor(downstream));
}

private class ListCollectorProcessor extends MultiOperatorProcessor<T, List<T>> {

    public ListCollectorProcessor(MultiSubscriber<? super List<T>> downstream) {
        super(downstream);
    }

    @Override
    public void onItem(T item) {
        bufferList.add(item);
        if (bufferList.size() >= count) {
            this.downstream.onItem(getUni());
        }
    }

    @Override
    public void onCompletion() {
        Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
        if (subscription != Subscriptions.CANCELLED) {
            this.downstream.onItem(getUni());
            this.downstream.onCompletion();
        }
    }

    private List<T> getUni() {
        tempBufferList.clear();
        tempBufferList.addAll(bufferList);
        bufferList.clear();
        return tempBufferList;
    }
}

Когда я начал тестировать его со следующим кодом:

void buffer_after5Inputs_uniShouldBeReturned() {
    Multi<List<Integer>> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6)
            .plug( integerMulti -> new Buffer<>(integerMulti, 4));
    multi.subscribe().with(
            integerList -> System.out.println(integerList),
            failur -> System.out.println(failur),
            () -> System.out.println("completed")
    );
    AssertSubscriber<List<Integer>> assertSubscriber =
            multi.subscribe().withSubscriber(AssertSubscriber.create(2))
                    .assertCompleted()
            .assertItems(Arrays.asList(1, 2, 3, 4), Arrays.asList(5, 6));
}

ОТ моего первого подписчика я получаю: [1, 2, 3, 4] [5, 6] завершено. Но Assertsubscriber не получает никаких событий, ни onItem, ни onCompleted. Я где-то ошибся?


person jzimmerli    schedule 12.03.2021    source источник


Ответы (1)


Через некоторое время я понял, какую ошибку совершил:

Я неправильно использовал AssertSubscriber: create (int) описывает, сколько событий запрашивается у эмиттера. В моем случае мне нужно 6 событий

Я не делал глубокую копию своего списка, скорее изменил его на предыдущий список с моими новыми ценностями. Это похоже на то, что событие не записывается правильно, но значения списка, на который указывает событие, изменены. Поэтому отображаются правильные значения. Я изменил свой код с:

 private List<T> getUni() {
    tempBufferList.clear();
    tempBufferList.addAll(bufferList);
    bufferList.clear();
    return tempBufferList;

to

private List<T> getUni() {
        List<T> tempBufferList = new ArrayList<>(bufferList);
        bufferList = new ArrayList<>();
        return tempBufferList;
    }

И мой буфер списка работает нормально, и тест выполняется правильно.

person jzimmerli    schedule 13.03.2021