Я пытаюсь создать бесконечную прокрутку в своем приложении для Android, используя обратное давление в rx Java. Я хочу, чтобы он вызывал внешнюю службу только запрошенное количество раз (после вызова request(1)
). Но после использования плоской карты каждый subscribe
загружает 16 страниц.
ниже моего кода с ожидаемыми результатами. Почти каждый тест не проходит из-за первого запроса (при n=16)
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;
public class ServiceObservablesTest {
public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
AtomicInteger pageNumber = new AtomicInteger();
subscriber.setProducer(n -> {
// at subscribe rxJava makes request for 16 elements - probably because of flatMap
// after first request with 16 elements everything seems to work fine even if i ignore the 'n' param
Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
subscriber.onNext(page);
});
});
return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
}
public interface DataProvider<T> {
Observable<List<T>> requestPage(int page);
}
private DataProvider provider;
@Before
public void setUp() throws Exception {
provider = Mockito.mock(DataProvider.class);
List<Object> list = Arrays.asList(new Object());
when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
}
@Test
public void shouldRequestOnlyFirstPageOnSubscribe() {
//given
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
//then
subscriber.assertValueCount(1);
subscriber.assertNotCompleted();
verify(provider, times(1)).requestPage(0);
verify(provider, never()).requestPage(1);
}
@Test
public void shouldRequestNumberOfPagesSpecified() {
//given
int requested_pages = 5;
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(requested_pages);
//then
subscriber.assertValueCount(requested_pages);
subscriber.assertNotCompleted();
for (int i = 0; i < requested_pages; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(requested_pages);
}
@Test
public void shouldCompleteAfterRetrievingEmptyResult() {
//given
int emptyPage = 2;
when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
//then
subscriber.assertValueCount(emptyPage);
subscriber.assertCompleted();
verify(provider, times(1)).requestPage(0); //requested at subscribe
for (int i = 1; i <= emptyPage; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(emptyPage + 1);
}
@Test
public void shouldRequestNextPageWhenRequestedMore() {
//given
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(1);
//then
subscriber.assertValueCount(2);
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);
//when
subscriber.requestMore(1);
//then
subscriber.assertValueCount(3);
subscriber.assertNotCompleted();
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, times(1)).requestPage(2);
verify(provider, never()).requestPage(3);
}
@Test
public void shouldWorkWithMultipleSubscribers() {
//given
TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber1);
flightsObservable.subscribe(subscriber2);
//then
subscriber1.assertValueCount(1);
subscriber2.assertValueCount(1);
verify(provider, times(2)).requestPage(0);
verify(provider, never()).requestPage(1);
//when
subscriber1.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(1);
verify(provider, times(2)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);
//when
subscriber2.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(2);
verify(provider, times(2)).requestPage(0);
verify(provider, times(2)).requestPage(1);
verify(provider, never()).requestPage(2);
}
}
DataProvider.requestPage(int)
работать с использованием потока, будущего,observeOn(Scheduler)
илиsubscribeOn(Scheduler)
? - person Aaron   schedule 12.08.2015subscribeOn
, иobserveOn
- person dmnk_89   schedule 13.08.2015