Противодействие RxJava и количество обращений к производителю

Я пытаюсь создать бесконечную прокрутку в своем приложении для Android, используя обратное давление в rx Java. Я хочу, чтобы он вызывал внешнюю службу только запрошенное количество раз (после вызова request(1)). Но после использования плоской карты каждый subscribe загружает 16 страниц.

ниже моего кода с ожидаемыми результатами. Почти каждый тест не проходит из-за первого запроса (при n=16)

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;

public class ServiceObservablesTest {


    public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
        Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
            AtomicInteger pageNumber = new AtomicInteger();
            subscriber.setProducer(n -> {
                // at subscribe rxJava makes request for 16 elements - probably because of flatMap
                // after first request with 16 elements everything seems to work fine even if i ignore the 'n' param

                Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
                subscriber.onNext(page);

            });
        });
        return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
    }

    public interface DataProvider<T> {
        Observable<List<T>> requestPage(int page);
    }


    private DataProvider provider;

    @Before
    public void setUp() throws Exception {
        provider = Mockito.mock(DataProvider.class);
        List<Object> list = Arrays.asList(new Object());
        when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
    }

    @Test
    public void shouldRequestOnlyFirstPageOnSubscribe() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(1);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, never()).requestPage(1);
    }


    @Test
    public void shouldRequestNumberOfPagesSpecified() {
        //given

        int requested_pages = 5;
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(requested_pages);

        //then
        subscriber.assertValueCount(requested_pages);
        subscriber.assertNotCompleted();


        for (int i = 0; i < requested_pages; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(requested_pages);

    }


    @Test
    public void shouldCompleteAfterRetrievingEmptyResult() {
        //given

        int emptyPage = 2;
        when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
        Observable<List<Object>> flightsObservable = create(provider);


        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(emptyPage);
        subscriber.assertCompleted();


        verify(provider, times(1)).requestPage(0); //requested at subscribe
        for (int i = 1; i <= emptyPage; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(emptyPage + 1);

    }

    @Test
    public void shouldRequestNextPageWhenRequestedMore() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(2);
        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(3);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, times(1)).requestPage(2);
        verify(provider, never()).requestPage(3);

    }

    @Test
    public void shouldWorkWithMultipleSubscribers() {

        //given

        TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
        TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber1);
        flightsObservable.subscribe(subscriber2);

        //then
        subscriber1.assertValueCount(1);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, never()).requestPage(1);

        //when
        subscriber1.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber2.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(2);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(2)).requestPage(1);
        verify(provider, never()).requestPage(2);
    }

}

person dmnk_89    schedule 12.08.2015    source источник
comment
Будет ли расписание DataProvider.requestPage(int) работать с использованием потока, будущего, observeOn(Scheduler) или subscribeOn(Scheduler)?   -  person Aaron    schedule 12.08.2015
comment
да, я использую и subscribeOn, и observeOn   -  person dmnk_89    schedule 13.08.2015
comment
Ваш metaObservable неправильно реализует обратное давление и не завершается. По умолчанию flatMap/merge подписывается только на 16 Observables за раз и не будет двигаться дальше, пока они в конечном итоге не завершатся.   -  person akarnokd    schedule 13.08.2015


Ответы (1)


Обратное давление предназначено для согласования поведения одновременного потребителя и производителя и позволяет автору программы устанавливать стратегии для разрешения того, что делать, когда скорость производства данных превышает скорость потребления данных.

Тем не менее, вы увидите, что операторы, которые объединяют наблюдаемые, такие как merge, дадут вам запрошенную сумму, которая не соответствует количеству требуемых данных. Внешний наблюдаемый (наблюдаемый из наблюдаемых) всегда будет получать запрос на 16 на RxAndroid (128 на RxJava) при слиянии. Затем, когда он получает внутренние наблюдаемые из списка, каждая внутренняя наблюдаемая будет получать запрос, основанный на запрошенной сумме от нижестоящего подписчика. Если вы попытаетесь написать Observable<Observable<T>>, вам придется написать функцию OnSubscribe<Observable<List<T>>>, которая внутренне управляет поведением слияния, чтобы она была Observable<List<T>> вместо Observable<Observable<List<T>>. Написание этого заставит вас подписаться на наблюдаемую, возвращенную вашим поставщиком данных, чтобы развернуть и onNext List<T>.

Я предлагаю вместо этого сопоставить y-позиции экрана с событиями End-Of-Page, затем использовать сканирование для преобразования этого числа в монотонно возрастающее число, а затем concatMap этого числа в вызов DataProvider.requestPage().

screenYPositions
    .map(this::isUninitializedOrNearEndOfPage)
    .scan(1, (event, pageNumber) -> pageNumber + 1 )
    .concatMap(dataProvider::requestPage)
    .subscribe(testSubscriber);
person Aaron    schedule 12.08.2015
comment
на Android я считаю, что размер кольцевого буфера по умолчанию составляет 16, а не 128 - person Dave Moten; 13.08.2015
comment
Ах, это была моя теория. Спасибо Дэйв! - person Aaron; 13.08.2015
comment
Таким образом, слияние предварительно загружает данные, и нет возможности контролировать, сколько раз будет вызываться производитель? - person dmnk_89; 13.08.2015
comment
Слияние будет вызывать «producer.request(int)» с переменной скоростью, которая зависит от скорости, с которой производитель передает данные, и скорости, с которой цепочка операторов ниже слияния потребляет данные. observeOn также имеет буфер среди других операторов, каждый из которых использует обратное давление для заполнения и опустошения буферов. - person Aaron; 13.08.2015