Observable.Generate Delays results

Следующий код задерживает результаты на 2 секунды. Я хочу немедленно вернуть результаты, но запускать новую наблюдаемую каждые 2 секунды. Что мне не хватает?

ВЫХОДЫ:

**The current output is:**
05: 1. Run
07 Result: 1

07: 2. Run
09 Result: 2

09: 3. Run
11 Result: 3


**Desired output is:**
05: 1. Run
05 Result: 1

07: 2. Run
07 Result: 2

09: 3. Run
09 Result: 3

КОД:

    var sources = Enumerable.Range(1, 8).Select(i =>
                                                {
                                                    Console.WriteLine("{0}: {1}. Run", DateTimeOffset.Now.ToString("ss"), i);

                                                    return Observable.Return(i, CurrentThreadScheduler.Instance);
                                                });

    Observable.Generate(sources.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(2000), ThreadPoolScheduler.Instance)
              .Merge()
              .Timestamp()
              .Do(r =>
                  {
                      Console.WriteLine("{0} Result: {1}{2}", r.Timestamp.ToString("ss"), r.Value, Environment.NewLine);
                  },
                  ex =>
                  {
                      Console.WriteLine(ex.ToString());
                  },
                  () =>
                  {
                      Console.WriteLine("Completed");
                  })
              .Subscribe();

person Samet S.    schedule 15.11.2014    source источник
comment
На самом деле совсем не ясно, что вам здесь нужно - я предполагаю, что ваш комментарий к ответу Пола означает, что вам нужны переменные интервалы, управляемые данными, но помимо этого (на мой обученный глаз Rx) в вашем коде есть много странных вещей. Возможно, было бы полезно объяснить не в терминах rx, чего вы пытаетесь достичь. Например, не совсем понятно, зачем вы создаете IEnumerable<IObservable> для sources, как это похоже на IEnumerable<>. Также неясно, должен ли sources содержать данные, указывающие желаемые интервалы.   -  person James World    schedule 17.11.2014


Ответы (2)


Как насчет:

Observable.Interval(TimeSpan.Zero, TimeSpan.FromSeconds(2.0))
    .SelectMany(_ => GenerateAnObservable())
    .Subscribe(/* ... */);
person Ana Betts    schedule 15.11.2014
comment
Я использую generate для селектора с переменным периодом в каждом цикле. - person Samet S.; 15.11.2014

Как насчет того, чтобы заменить текущий Observable.Generate на это:

Observable
    .Generate(
        0,
        i => true,
        i => i + 1,
        i => i,
        i => TimeSpan.FromMilliseconds(i == 0 ? 0 : 2000),
        ThreadPoolScheduler.Instance)
    .Zip(sources, (g, s) => s)

Получил такой результат:

41: 1. Run
41 Result: 1

43: 2. Run
43 Result: 2

45: 3. Run
45 Result: 3

47: 4. Run
47 Result: 4

49: 5. Run
49 Result: 5

51: 6. Run
51 Result: 6

53: 7. Run
53 Result: 7

55: 8. Run
55 Result: 8

Completed
person Enigmativity    schedule 16.11.2014