Как изменить реализацию Rx Builder, чтобы исправить исключение переполнения стека?

Я пытаюсь придумать Rx Builder для использования Reactive Extension в синтаксисе F# Computation Expression. Как мне исправить это, чтобы он не взорвал стек? Как пример Seq ниже. И есть ли планы по реализации RxBuilder как части Reactive Extensions или как части будущих версий .NET Framework?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

person Holoed    schedule 28.05.2011    source источник
comment
Это может вас заинтересовать: feedproxy.google.com /~r/FCode/~3/rDQbQHOROOw/   -  person Ramon Snir    schedule 28.05.2011
comment
Вы смотрели на оператор Expand в последних сборках Rx? Он обрабатывает определенные сценарии рекурсии.   -  person Richard Szalay    schedule 29.05.2011
comment
Как бы вы применили Observable.Expand к этой проблеме? У вас есть ссылка, объясняющая семантику Expand ?   -  person Holoed    schedule 29.05.2011
comment
@Ramon: это блог Holoed :)   -  person Mauricio Scheffer    schedule 29.05.2011


Ответы (5)


Короткий ответ заключается в том, что Rx Framework не поддерживает создание наблюдаемых с использованием такого рекурсивного шаблона, поэтому это нелегко сделать. Операция Combine, используемая для последовательностей F#, требует специальной обработки, которую не обеспечивают наблюдаемые объекты. Rx Framework, вероятно, ожидает, что вы будете генерировать наблюдаемые объекты с помощью Observable.Generate, а затем использовать для их обработки запросы LINQ/построитель вычислений F#.

Тем не менее, вот некоторые мысли -

Прежде всего, вам нужно заменить Observable.merge на Observable.Concat. Первый запускает обе наблюдаемые параллельно, а второй сначала выдает все значения из первой наблюдаемой, а затем создает значения из второй наблюдаемой. После этого изменения фрагмент будет печатать как минимум ~800 чисел до переполнения стека.

Причина переполнения стека заключается в том, что Concat создает наблюдаемую, которая вызывает Concat, чтобы создать другую наблюдаемую, которая вызывает Concat и т. д. Один из способов решить эту проблему — добавить некоторую синхронизацию. Если вы используете Windows Forms, вы можете изменить Delay так, чтобы он планировал наблюдаемое в потоке графического интерфейса (который отбрасывает текущий стек). Вот набросок:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

Чтобы реализовать это должным образом, вам пришлось бы написать свой собственный метод Concat, что довольно сложно. Идея будет заключаться в следующем:

  • Concat возвращает какой-то специальный тип, например. IConcatenatedObservable
  • Когда метод вызывается рекурсивно, вы создаете цепочку из IConcatenatedObservable, которые ссылаются друг на друга.
  • Метод Concat будет искать эту цепочку, и когда есть, например. три объекта, он бросит средний (чтобы всегда сохранять цепочку длиной не более 2).

Это слишком сложно для ответа StackOverflow, но это может быть полезным отзывом для команды Rx.

person Tomas Petricek    schedule 28.05.2011
comment
Спасибо. Отличный ответ. Я надеюсь, что команда Rx слушает :) - person Holoed; 29.05.2011
comment
Не могли бы вы использовать TakeUntil для удаления и удаления старых наблюдаемых? Конечно, вам понадобится какой-то способ ухватиться за предыдущую наблюдаемую. - person ; 08.06.2011

Обратите внимание, что это было исправлено в Rx v2.0 (как уже упоминалось здесь), в более общем плане для всех операторов последовательности (Concat, Catch, OnErrorResumeNext), а также императивных операторов (If, While и т. д.).

По сути, вы можете думать об этом классе операторов как о подписке на другую последовательность в сообщении наблюдателя терминала (например, Concat подписывается на следующую последовательность после получения текущего сообщения OnCompleted), и здесь возникает аналогия с хвостовой рекурсией.

В Rx v2.0 все подписки с хвостовой рекурсией объединены в структуру данных, подобную очереди, для обработки по одной за раз, взаимодействуя с нижестоящим наблюдателем. Это позволяет избежать неограниченного роста количества наблюдателей, разговаривающих друг с другом для последовательных подписок на последовательности.

person Bart De Smet    schedule 05.09.2012

Это было исправлено в Rx 2.0 Бета. А вот тест.

person Community    schedule 07.06.2011

Как насчет чего-то подобного?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/ (создан для экспериментов с RxBuilder)

Одноразовый xs не подключен. Как только я пытаюсь подключить одноразовые провода, он снова взрывает стопку.

person Holoed    schedule 01.06.2011

Если мы удалим синтаксический сахар из этого вычислительного выражения (также известного как Монада), мы получим:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

Or in C#:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

Что определенно не является хвостовой рекурсией. Я думаю, что если вы сможете сделать его рекурсивным, то это, вероятно, решит вашу проблему.

person Ankur    schedule 29.05.2011