Создайте Observable, который задерживает следующее значение

Я пытаюсь создать наблюдаемое с помощью RxJS, которое делает то, что изображено.

Ожидаемое наблюдаемое сопоставление

  • Берет значение и ждет фиксированный период времени, прежде чем получить следующее.
  • Следующее будет последним значением, выпущенным в период ожидания, пропуская остальные.
  • Если проходит интервал ожидания, в течение которого не было передано никакого значения, следует немедленно захватить следующее, как показано в последнем примере изображения.

person queimadus    schedule 22.05.2014    source источник
comment
Спасибо за мраморную диаграмму.   -  person cwharris    schedule 23.05.2014


Ответы (2)


Это должно помочь.

var Rx      = require('rx'),
    source  = Rx.Observable.interval(10).take(100),
    log     = console.log.bind(console);

Rx.Observable.create(function (observer) {

    var delaying = false,
        hasValue = false,
        complete = false,
        value;

    function onNext (x) {
      value = x;
      if (delaying) {
        hasValue = true;
      } else {
        sendValue();
      }
    }

    function sendValue () {
      observer.onNext(value);
      if (complete) {
        observer.onCompleted();
      } else {
        setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
      }
      delaying = true;
    }

    function callback () {
      if (hasValue) {
        hasValue = false;
        sendValue();
      } else {
        delaying = false;
      }
    }

    return source.subscribe(
        onNext,
        observer.onError.bind(observer),
        function () {
          if (hasValue) {
            complete = true;
          } else {
            observer.onCompleted();
          }
        }
      );
  })
  .subscribe(log);
person cwharris    schedule 24.05.2014
comment
Отличное решение. Прекрасно работает. Можно ли преобразовать это в оператор LINQ? У нас будет что-то вроде этого: source.delayNext(other).subscribe() Чтобы мы могли использовать значения other для динамического выбора времени задержки следующего элемента. - person queimadus; 24.05.2014
comment
Абсолютно. Вы должны взглянуть на исходный код Rx и посмотреть, как это сделать. :) - person cwharris; 24.05.2014

Вот решение Кристофера, преобразованное в оператора.

Оператор throttleImmediate сохраняет только последнее значение из источника до тех пор, пока данный селектор не завершится. Он запускает кешированное значение, если оно существует, сразу после каждого завершения. Лучше всего использовать, когда у селектора есть побочные эффекты (например, анимация).

var Rx  = require('rx'),
source  = Rx.Observable.interval(10).take(500),
log     = console.log.bind(console);

Rx.Observable.prototype.throttleImmediate = function (selector) {
    var source = this;

    return Rx.Observable.create(function (observer) {

        var delaying = false,
            hasValue = false,
            complete = false,
            value;

        function onNext (x) {
          value = x;
          if (delaying) {
            hasValue = true;
          } else {
            sendValue();
          }
        }

        function sendValue () {
          delaying = true;
          selector(value).subscribe(
            observer.onNext.bind(observer),
            observer.onError.bind(observer),
            function(){
              if (hasValue) {
                hasValue = false;
                sendValue();
              } else {
                delaying = false;
              }
            }
          );
        }

        return source.subscribe(
            onNext,
            observer.onError.bind(observer),
            function () {
              if (hasValue) {
                complete = true;
              } else {
                observer.onCompleted();
              }
            }
          );
      });
};

source
  .throttleImmediate(function(data){
    var delay;

    if(data%2==0)
      delay=500;
    else
      delay=1000;

    return Rx.Observable.timer(delay).map(function(){ return data; });
  })
  .subscribe(log)

Это удобно при обратном давлении на источники, где значение задержки известно только селектору.

Пример: учитывая мраморную диаграмму вопроса.

Предположим, что первым источником являются вызовы ajax с отображаемыми данными html, ajaxPages которые исходят из щелчков на панели навигации. И мы хотим визуализировать их вместе с анимацией входа animatePage, продолжительность которой является динамической.

ajaxPages.throttleImmediate(animatePage).subscribe();

Здесь мы анимируем страницы значениями из источника, пропуская все значения, которые генерируются в течение периода анимации, кроме самого последнего.

На практике мы получаем поток, который игнорирует щелчки, за которыми вскоре следуют другие щелчки, и бесполезно показывать пользователю, поскольку они будут анимироваться и немедленно анимируются.

person queimadus    schedule 26.05.2014
comment
Исправление сообщения until обычно используется в сочетании с автоматическим удалением (takeUntil). Я бы предложил использовать что-то другое, поскольку delayUntil звучит так, будто вы хотите отложить значение до тех пор, пока наблюдаемый объект animatePage не даст результат. На самом деле, оператора можно было бы точнее описать как throttle. Однако оператор с таким именем уже существует в Rx. Этот оператор ДЕЙСТВИТЕЛЬНО должен называться debounce, но на данный момент мы ничего не можем с этим поделать. Поэтому я бы выбрал что-то вроде thottleImmediate. - person cwharris; 27.05.2014
comment
Оператор throttle - печальная история, но на данном этапе его слишком часто переименовали ...: / - person cwharris; 27.05.2014