Почему мне нужно избавляться от подписок после завершения?

Книга Intro To RX описывает возвращаемое значение OnSubscribe как IDisposible и отмечает, что от подписок следует избавляться при вызове OnError и OnCompleted.

Следует учесть, что, когда последовательность завершается или возникает ошибка, вы все равно должны избавиться от своей подписки.

Из Введение в RX: Lifetime Management, OnError и OnCompleted

Почему это?


Для справки, это класс, над которым я сейчас работаю. Я, вероятно, собираюсь отправить его на проверку кода в какой-то момент.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
    private readonly Action onTimeout;
    private object signalLock = new object();
    private IObserver<Unit> signals;

    /// <summary>
    /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
    /// </summary>
    /// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
    /// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
    public TrafficTimeout(TimeSpan timeout, Action onTimeout)
    {
        // Subscribe to a throttled observable to trigger the expirey
        var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
        IDisposable subscription = null;
        subscription = messageQueue.Throttle(timeout).Subscribe(
        p =>
        {
            messageQueue.OnCompleted();
            messageQueue.Dispose();
        });

        this.signals = messageQueue.AsObserver();
        this.onTimeout = onTimeout;
    }

    /// <summary>
    /// Signals that traffic has been received.
    /// </summary>
    public void Signal()
    {
        lock (this.signalLock)
        {
            this.signals.OnNext(Unit.Default);
        }
    }
}

person Gusdor    schedule 17.11.2017    source источник


Ответы (2)


Одноразовый объект, возвращаемый Subscribe методами расширения, возвращается исключительно для того, чтобы вы могли вручную отказаться от подписки на наблюдаемый до естественного завершения наблюдаемого.

Если наблюдаемый завершается - либо с OnCompleted, либо с OnError, то подписка уже удалена для вас.

Попробуйте этот код:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

Если вы запустите вышеуказанное, вы увидите, что "Disposed!" записывается в консоль, когда наблюдаемый завершается без необходимости вызова .Dispose() в подписке.

Следует отметить одну важную вещь: сборщик мусора никогда не вызывает .Dispose() для наблюдаемых подписок, поэтому вы должны удалить свои подписки, если они не закончились (или могли не закончиться) естественным образом до того, как ваша подписка выйдет за пределы области действия.

Возьмем, к примеру:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

Наблюдаемый ds будет присоединяться к обработчику событий только тогда, когда у него есть подписка, и отключается только после завершения наблюдаемого объекта или удаления подписки. Поскольку это обработчик событий, наблюдаемый объект никогда не завершится, потому что он ожидает других событий, и, следовательно, удаление - единственный способ отсоединиться от события (для приведенного выше примера).

Если у вас есть FromEventPattern наблюдаемый, который, как вы знаете, когда-либо будет возвращать только одно значение, тогда разумно добавить метод расширения .Take(1) перед подпиской, чтобы обработчик событий мог автоматически отсоединяться, и тогда вам не нужно вручную удалять подписку.

Вот так:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

Надеюсь, это поможет.

person Enigmativity    schedule 18.11.2017
comment
@Gusdor - Ничего страшного. Вы видели мой пример использования рекурсивного планирования, который я добавил к своему ответу на ваш другой вопрос Rx? stackoverflow.com/a/47275900/259769 - person Enigmativity; 19.11.2017

Во-первых, вот пример проблемы, с которой вы можете столкнуться:

void Main()
{
    Console.WriteLine(GC.GetTotalMemory(true));
    for (int i = 0; i < 1000; i++)
    {
        DumbSubscription();
        Console.WriteLine(GC.GetTotalMemory(true));
    }
    Console.WriteLine(GC.GetTotalMemory(true));
}

public void DumbSubscription()
{
    Observable.Interval(TimeSpan.FromMilliseconds(50))
        .Subscribe(i => {});
}

Вы увидите, как использование памяти постоянно увеличивается. Активные подписки Rx не собираются сборщиком мусора, и эта наблюдаемая величина бесконечна. Следовательно, если вы увеличите предел цикла или добавите задержку, у вас просто будет больше потраченной впустую памяти: ничто не поможет вам, кроме удаления этих подписок.

Однако, допустим, мы изменили определение DumbSubscription на следующее:

public void DumbSubscription()
{
    Observable.Interval(TimeSpan.FromMilliseconds(50))
        .Take(1)
        .Subscribe(i => {});
}

Добавление .Take(1) означает, что наблюдаемое завершится через один интервал, поэтому оно больше не бесконечно. Вы увидите, что использование памяти стабилизируется: подписки имеют тенденцию правильно уничтожать себя после завершения или исключения.

Однако это не меняет того факта, что, как и любой другой IDisposable, рекомендуется вызывать Dispose (вручную или через using), чтобы убедиться, что ресурсы правильно утилизированы. Кроме того, если вы настроите свой наблюдаемый объект, вы легко можете столкнуться с проблемой утечки памяти, указанной в начале.

person Shlomo    schedule 17.11.2017
comment
В моем примере класса OnCompleted вызывается для BehaviourSubject, поэтому я думаю, можно с уверенностью предположить, что последовательность не бесконечна. Уместно ли вы закончить оценкой управления сроком службы в примере кода? В качестве альтернативы я создал для этой цели стек проверки кода. ›› codereview.stackexchange.com/questions/180708/. - person Gusdor; 18.11.2017