JavaScript

Работа с Axios и RxJS для создания простой библиотеки HTTP

Простой HTTP-модуль, основанный на наблюдаемом RxJS, который может прервать XHR при отписке.

RxJS - лучшая библиотека для обработки потоков данных и использования различных фильтров для преобразования данных, а Axios - одна из лучших библиотек для обработки кроссбраузерные запросы Ajax.

Если вы не знаете о RxJS или Axios, вам следует взглянуть на их документацию, потому что я не собираюсь здесь подробно объяснять их работу. Вам также следует клонировать репозиторий js-plugin-starter и установить как rxjs, так и axios , чтобы настроить простое приложение.

В любом случае, давайте немного поговорим о RxJS. RxJS вращается вокруг трех концепций, а именно. наблюдаемый, наблюдатель и подписка. Наблюдаемый - это объект, который передает данные с течением времени, наблюдатель прослушивает эти данные, а подписка - это договор между этими двумя сторонами, который может быть нарушен.

наблюдаемое можно создать с помощью Observable.create метода, который принимает функцию с аргументом observer. Мы можем вызвать метод next, error или complete для объекта observer. Этот метод возвращает наблюдаемый, на который мы позже можем подписаться и выполнять обратные вызовы при поступлении данных.

Мы можем подписаться на наблюдаемое, используя для него subscribe метод. Этот метод ожидает наблюдателя типа Observer , который представляет собой простой объект с клавишами next, error и complete, которые имеют значения функций. наблюдатель также может быть встроенными аргументами функции ( nextCb, errorCb, completeCb ). Этот метод возвращает подписку типа Subscription, которую мы можем использовать, чтобы отказаться от подписки на наблюдаемые с помощью метода unsubscribe. Мы также можем вызвать метод next, error или complete на subscription для передачи данных наблюдателю извне.

Все это можно продемонстрировать на простом примере.

let observable$ = Observable.create( ( observer ) => {
    // internal interaction with observable
    observer.next( 1 );
    observer.next( 2 );
    observer.next( 3 );
    
    //==> observer.error( 'error-message' );
    //==> observer.complete();
} );
let observer = {
    next: data => console.log( '[data] => ', data ),
    complete: data => console.log( '[complete]' ),
};
let subscription = observable$.subscribe( observer );
// outside interaction with observable
subscription.next( 4 );
subscription.next( 5 );
subscription.complete();

Мы можем выполнить простой HTTP-запрос внутри Observable.create и вызвать метод next, когда он успешно разрешится, или вызвать метод error при возникновении какой-либо ошибки. Давайте воспользуемся для этого axios библиотекой. Я собираюсь использовать публичный API jsonplaceholder для этой демонстрации.

import { Observable } from 'rxjs';
import axios from 'axios';
let observable$ = Observable.create( ( observer ) => {
    axios.get( 'https://jsonplaceholder.typicode.com/users' )
    .then( ( response ) => {
        observer.next( response.data );
        observer.complete();
    } )
    .catch( ( error ) => {
        observer.error( error );
    } );
} );
let subscription = observable$.subscribe( {
    next: data => console.log( '[data] => ', data ),
    complete: data => console.log( '[complete]' ),
} );

Вам может быть интересно, почему мы вызываем метод complete, когда он ничего не делает. Что ж, observable генерирует данные с течением времени, что делает их потоком данных. Когда запрос Ajax успешно разрешается, мы не ожидаем больше данных и поэтому вызываем complete. Любые последующие next или error вызовы подписчика будут игнорироваться, и наблюдатель автоматически отписывается от подписки. Метод error выполняет то же самое, что и ниже, поэтому вызов complete здесь не требуется. После complete вызов unsubscribe является избыточным.

Давайте посмотрим, как можно unsubscribe из наблюдаемого избежать побочных эффектов.

...
let subscription = observable$.subscribe( {
    next: data => console.log( '[data] => ', data ),
    complete: data => console.log( '[complete]' ),
} );
setTimeout( () => {
    subscription.unsubscribe();
} );

На этот раз в консоли ничего не будет. Когда мы подписались, был отправлен запрос Ajax. Но прежде чем мы получили ответ от XHR, мы отписались. Этот запрос был успешно разрешен, и были также вызваны методы next и complete, но из-за вызова unsubscribe() observer прекратил прослушивание observable.

Между unsubscribe и complete огромная разница. Когда вы вызываете отказаться от подписки, вы закрываете канал между наблюдателем и наблюдаемым. Когда вы вызываете complete, вы закрываете канал между наблюдаемым и всеми наблюдателями, на него подписанными. Другие наблюдатели, слушающие ту же самую наблюдаемую, получат данные, когда вы вызовете функцию отмены подписки.

Observable не знает о наблюдателе до тех пор, пока он не подписан на него, а observable не отправляет данные, если не установлена ​​подписка (кроме Subject, которая не является частью этой статьи). Наблюдаемый объект обычно передает данные одному наблюдателю, но, используя метод share, несколько наблюдателей могут совместно использовать один и тот же поток данных (один и тот же наблюдаемый).

Вызов unsubscribe очень необходим, если вы не хотите, чтобы в одностраничном приложении возникали побочные эффекты при изменении представления или маршрута. Но есть одна огромная проблема. Когда мы вызвали unsubscribe, он просто закрыл канал данных между нашим subscriber и observable. Но это не прервало XHR. В этом случае полезная нагрузка ответа была довольно маленькой, но это мог быть большой файл или видеопоток. Если бы у нас был, скажем, SPA, и при изменении представления мы бы отказались от подписки, тогда этот запрос потреблял бы большую часть пропускной способности сети в фоновом режиме, создавая сетевую задержку для дальнейших запросов.

Эта статья предназначена для решения той же проблемы, и мы решим ее, углубившись в RxJS. Это заняло у меня много времени, чтобы понять, и я не мог найти никаких решений в Интернете, поэтому я буду бесстыдным и похлопаю себя за это. Итак, начнем.

Мы поиграли с простым созданием наблюдаемых объектов, используя метод Observable.create, а RxJS Subject - это способ создания наблюдаемых объектов, который позволяет посторонним генерировать и наблюдать данные. Давайте узнаем, как создать новый экземпляр класса Observable и работать с ним.

Конструктор Observable ожидает функцию, которая имеет observer в качестве аргумента и должна возвращать логику удаления. Как мы уже обсуждали, наблюдаемые не имеют представления о наблюдателе, пока он не подписан на него, поэтому нам нужно предоставить динамическую функцию, которая возвращает экземпляр Subscriber, который обрабатывает логику удаления и что делать, когда наблюдатель отписывается.

import { Observable, Subscriber } from 'rxjs';
let observable$ = new Observable( ( observer ) => {
    return new Subscriber( observer );
} );

Логика удаления - это не что иное, как экземпляр Subscriber, у которого есть разные хуки, такие как next, error, complete и unsubscribe. Мы также можем вызвать для него методы next, error и complete. Это возможно, потому что класс Subscriber реализует интерфейс Observer и расширяет класс Subscription. Subscriber конструкторы принимают observer. Мы можем использовать observer для вызова next, error и complete, как мы делали это в Observer.create.

В нашем случае нас интересует хук unsubscribe. Когда наблюдатель отписывается, нам нужно прервать XHR, инициированный axios. Но для реализации этих ловушек нам нужно создать новый класс, расширяющий класс Subscriber.

class AxiosSubscriber extends Subscriber {
    constructor( observer ) {
        super( observer );
        
        observer.next( 'HELLO' );
        observer.complete();
    }
   unsubscribe() {
        console.log( 'unsubscribed' );
        super.unsubscribe();
    }
}
let observable$ = new Observable( ( observer ) => {
    return new AxiosSubscriber( observer );
} );
let subscription = observable$.subscribe( console.log );

Остерегаться! unsubscribe может быть вызван несколько раз из-за внутренней реализации RxJS. Следовательно, не забудьте сохранить указатель в классе, который отслеживает, очищены ли уже ресурсы.

Чтобы отменить базовый XHR-запрос axios, мы собираемся использовать модуль axios-cancel. Этот модуль добавляет cancel метод прототипа для axios, который принимает requestId аргумент. Нам нужно создать и передать строку requestId при выполнении запроса axios.

import { Observable, Subscriber } from 'rxjs';
import axios from 'axios';
import axiosCancel from 'axios-cancel';
// adds cancel prototype method
axiosCancel( axios );
class AxiosSubscriber extends Subscriber {
    constructor( observer ) {
        super( observer );
        // create sample request id
        this.requestId = Math.random() + '-xhr-id';
        // XHR complete pointer
        this.completed = false;
        // make axios request on subscription
        axios.get( 'https://jsonplaceholder.typicode.com/users', {
            requestId: this.requestId
        } )
        .then( ( response ) => {
            observer.next( response.data );
            this.completed = true;
            observer.complete();
        } )
        .catch( ( error ) => {
            this.completed = true;
            observer.error( error );
        } );
    }
    unsubscribe() {
        super.unsubscribe();
        
        // cancel XHR
        if( this.completed === false ) {
            axios.cancel( this.requestId );
            this.completed = true;
        }
    }
}
let observable$ = new Observable( ( observer ) => {
    return new AxiosSubscriber( observer );
} );
let subscription = observable$.subscribe( console.log );

Теперь нам просто нужно увидеть, когда мы откажемся от подписки, отменяет ли это XHR? В приведенной ниже реализации, как также показано в предыдущих примерах, мы отказываемся от подписки, как только отправляется запрос Ajax. Но на этот раз у нас есть логика отказа от подписки, и мы должны отменить XHR.

let subscription = observable$.subscribe( console.log );
setTimeout( () => {
    subscription.unsubscribe();
} );

Благодаря этому вам не нужно полагаться на другие наблюдаемые библиотеки HTTP, и вы также можете использовать свою любимую библиотеку HTTP, такую ​​как fetch или jQuery.

Это очень полезно при реализации switchMap. Оператор switchMap отменяет подписку и удаляет предыдущие наблюдаемые. Если вы работаете над плагином типа typeahead, который отправляет HTTP-запросы при каждом нажатии клавиши, то switchMap с наблюдаемым выше будет очень полезным. Основная мотивация копаться в RxJS для достижения всего этого - это чертовски упущенное фиаско. Но приведенный ниже пример сохранит вашу работу.

fromEvent( document.getElementById('typeahead-input'), 'input' )
.pipe(
    switchMap( () => observable$ )
)
.subscribe( console.log );