Использование TPL для создания последовательного порядка действий с помощью предложения continueWith

Прежде всего, я объясню, что я пытаюсь сделать.

У меня есть компонент A, который использует компонент B.

Чтобы общаться между ними обоими, мне нужно использовать event.

Одним из моих предварительных условий здесь является то, чтобы компонент B работал асинхронно И запускал обработчик событий в последовательном порядке, в котором они были вызваны.

Кроме того, я хотел бы отменить трубку вызова (когда пользователь спросит об этом). Таким образом, все вызванные обработчики событий, которые еще не выполнены, никогда не будут выполняться.

Решение для достижения на является TPL. Я сделал POC того, что я пытаюсь сделать:

    static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        var t = Task.Factory.StartNew(() => DoSomeWork(token));
                            //.ContinueWith((prevTask) => DoSomeWork(token));

        t.ContinueWith((prevTask) => DoSomeWork(token));

        Task.WaitAll(t);

        Console.WriteLine("Finish");

        Console.ReadKey();
    }

    static int id = 1;
    static void DoSomeWork(CancellationToken ct)
    {
        ct.ThrowIfCancellationRequested();

        Thread.Sleep(1000);
        
        Console.WriteLine(id++);
    }

Вот вывод этого фрагмента:

1

Заканчивать

2

Как видите, он заканчивается раньше, чем на самом деле. Он отображает 2 после Готово.

Если я изменю предыдущий код этим, он будет работать:

        static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        var t = Task.Factory.StartNew(() => DoSomeWork(token))
                            .ContinueWith((prevTask) => DoSomeWork(token));

        //t.ContinueWith((prevTask) => DoSomeWork(token));

        Task.WaitAll(t);

        Console.WriteLine("Finish");

        Console.ReadKey();
    }

    static int id = 1;
    static void DoSomeWork(CancellationToken ct)
    {
        ct.ThrowIfCancellationRequested();

        Thread.Sleep(1000);
        
        Console.WriteLine(id++);
    }

Вот вывод этого фрагмента:

1

2

Заканчивать

Как вы понимаете, мне нужно использовать оператор continueWith не в объявлении задачи, а при возникновении события.

Почему Task.WaitAll(t); не работает первый образец?

Кто-нибудь может мне помочь?


person Guillaume P.    schedule 13.06.2016    source источник
comment
Вы создаете 2 задачи. (Один с StartNew и один с ContinueWith). В первом примере t содержит задачу StartNew, а во втором примере t содержит задачу ContinueWith. Используйте var t1 = t.ContinueWith...; Task.WaitAll(t1);, чтобы они вели себя одинаково   -  person adrianm    schedule 13.06.2016
comment
Это работает отлично. Спасибо. Я думал, что это та же самая задача. Но действительно, continueWith создает новую задачу. Так я и сделал: t = t.ContinueWith((prevTask) => DoSomeWork(token));   -  person Guillaume P.    schedule 13.06.2016
comment
@GuillaumeG.: Простое примечание, но вы обнаружите, что асинхронное программирование работает намного лучше, если вы замените каждый ContinueWith на await и каждый Task.Factory.StartNew на Task.Run.   -  person Stephen Cleary    schedule 13.06.2016
comment
@StephenCleary Это лучший способ использовать await/async + Task.Run с точки зрения производительности, правда? Является ли это хорошей практикой?   -  person Guillaume P.    schedule 13.06.2016
comment
@GuillaumeG.: У меня есть несколько сообщений в блоге на проблемы с ContinueWith и проблемы с StartNew. Причины связаны с ремонтопригодностью и правильностью кода, а не с производительностью.   -  person Stephen Cleary    schedule 13.06.2016
comment
@StephenCleary Хорошо, я прочитаю. Спасибо.   -  person Guillaume P.    schedule 13.06.2016


Ответы (2)


Первоначальная проблема заключается в том, что вы создаете две задачи, но ожидаете только одну.

// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// the continuation task is not assigned
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait only on "t", which is the first task
Console.WriteLine("Finish"); // when the first task finishes, this gets printed
// now the continuation task is executing, but you are not waiting for it

Что происходит во втором фрагменте, так это то, что вы ожидаете задачи продолжения, поэтому она будет ждать, пока она не завершится.

// t is now the continuation task
var t = Task.Factory.StartNew(() => DoSomeWork(token))
             .ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait till the continuation task has finished

Итак, второй метод в порядке, но если вы хотите более тонкое управление, просто назначьте переменную задачи для ожидания продолжения задачи:

// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// The continuation task is assigned to "t2"
var t2 = t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(new [] { t, t2 } ); // <-- wait for all tasks
Console.WriteLine("Finish");

Примечание. Я следовал вашему коду в примерах, но WaitAll не принимает ни одной задачи в качестве параметра (он принимает массив задач), поэтому, вероятно, не компилируется. Вы либо используете Task.Wait, либо передаете массив WaitAll

person Jcl    schedule 13.06.2016
comment
Я думал, что это была та же самая задача. Спасибо, работает нормально. Task.WaitAll берет массив Task, но если вы передаете только один, он работает (неявно преобразуется в массив ОДНОЙ задачи). - person Guillaume P.; 13.06.2016

Правильный способ асинхронного кодирования на C# — использовать ключевое слово await.

public async Task DoLotsOfWork()
{
    await DoSomeWorkAsync();
    await DoSomeMoreWorkAsync();
    Console.WriteLine("Finish");
}

У вас будут некоторые проблемы с запуском этого кода из консольного приложения, поэтому я бы посоветовал вам использовать библиотеку Task.AsyncEx @StephenCleary.

https://www.nuget.org/packages/Nito.AsyncEx/

Вы используете это так.

public void Main()
{
    AsyncContext.Run(DoLotsOfWork);
}

В дальнейшем. Существует очень мало причин для использования методов Task.Run (или, что еще хуже, Task.Factory.StartNew). Они запускают ваш метод в фоновом режиме как часть работы Threadpool.

Например

private static async Task DoSomeWorkAsync(CancellationToken ct)
{
    await Task.Delay(TimeSpan.FromMilliseconds(1000), ct);
    Console.WriteLine(id++);
}

Это НЕ будет работать ни в одном потоке (таким образом, не блокируя ни один поток). Вместо этого создается таймер/обратный вызов, чтобы основной поток возвращался ко второй строке через 1000 миллисекунд.

РЕДАКТИРОВАТЬ: сделать это динамически также довольно просто

public async Task DoLotsOfWork(IEnumerable<Func<Task>> tasks)
{
    foreach(var task in tasks)
        await task();
    Console.WriteLine("Finished");
}

Однако, если вы спросите о методах, которые используют ужасный шаблон EAP, я бы посоветовал вам использовать вспомогательную функцию Rx Observable.FromEventPattern.

public async Task SendEmail(MailMessage message)
{
    using(var smtp = new SmtpClient())
    {
        smtp.SendAsync(message);
        await Observable.FromEventPattern<>(x => smtp.SendCompleted +=x, x => smtp.SendCompleted -=x)
                  .ToTask()
    }
}

Дальнейшее РЕДАКТИРОВАТЬ:

public class Publisher
{
    public IObservable<CancelationToken> SomeEvent {get;}
}

public abstract class Subscriber
{
    public abstract IObservable<CancelationToken> Subscribe(IObservable<CancelationToken> observable);

}

IEnumerable<Subscriber> subscribers = ...
Publisher publisher = ...

IDisposable subscription = subscribers.Aggregate(publisher.SomeEvent, (e, sub) => sub.Subscribe(e)).Subscribe();

//Dispose the subscription when you want to stop listening.
person Aron    schedule 13.06.2016
comment
Как я уже указал, мне нужно выполнить обработчик событий обратного вызова последовательно. И я не знаю порядок в начале. Это определяется во время выполнения. - person Guillaume P.; 13.06.2016
comment
@GuillaumeG. Что вы подразумеваете под обработчиком событий? Вы имеете в виду System.EventHandler? Непонятно, чего вы пытаетесь добиться, почему вы хотите использовать TPL. - person Aron; 13.06.2016
comment
Я постараюсь прояснить это. Мой ComponentB вызывает событие, а ComponentA реализует подписчиков на эти события (обработчик событий означает метод, выполняемый путем привязки к событиям). Это проясняет? И там мой ComponentB представляет собой реализацию workflowFoundation, которая последовательно выполняет список действий, и мне нужно информировать мой ComponentA о прогрессе (и делиться полезным состоянием). По определению, событие C# является синхронным, поэтому я использовал (сторона компонента B) TPL, чтобы сохранить порядок вызова событий и позволить ComponentB работать, не дожидаясь результата (сторона ComponentA). - person Guillaume P.; 13.06.2016
comment
@GuillaumeG. Нет. Похоже, вам нужен общий паб/подписка, а не событие. В этом случае вы хотите использовать Rx, а не TPL. - person Aron; 13.06.2016
comment
Я отредактировал свой пост, вы можете посмотреть, пожалуйста. Вам теперь лучше, что я имею в виду? - person Guillaume P.; 13.06.2016
comment
Pub/Sub и мероприятие — это не одно и то же? - person Guillaume P.; 13.06.2016
comment
@GuillaumeG. Я пытаюсь разобраться в некоторых сложностях, в которые вы вписали себя. Какова природа Подписчиков, почему вы считаете, что Task.Factory.StartNew важно? Без более конкретного понимания я не понимаю, почему вы используете параллелизм для реализации асинхронности... - person Aron; 13.06.2016
comment
Теперь это работает, спасибо за вашу помощь. Если вы хотите, чтобы я объяснил вам более подробно, чего я пытался достичь, просто скажите мне. Я хотел бы узнать больше о .NET. Просто скажите мне. - person Guillaume P.; 13.06.2016
comment
В своем редактировании вы используете шаблон Oberver, верно? Это лучшее решение. - person Guillaume P.; 13.06.2016
comment
@GuillaumeG, в частности, использует реализацию шаблона Reactive Extension. Который, как я обнаружил, решает многие болевые точки, которые у меня были с другими инструментами, особенно с утечками памяти. - person Aron; 13.06.2016