Заставить задачу продолжить текущий поток?

Я делаю перенос фреймворка AKKA для .NET (не воспринимайте это слишком серьезно, сейчас это часть актерской части на выходных)

У меня проблемы с поддержкой "Future" в нем. В Java / Scala Akka Futures следует ожидать синхронно с вызовом Await. Подобно .NET Task.Wait ()

Моя цель - поддержать для этого истинный асинхронный ожидание. Сейчас это работает, но в моем текущем решении продолжение выполняется не в том потоке.

Это результат передачи сообщения одному из моих акторов, содержащему блок ожидания на будущее. Как видите, субъект всегда выполняется в одном потоке, а блок ожидания выполняется в случайном потоке пула потоков.

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...

Актер получает сообщение с помощью DataFlow BufferBlock<Message> Или, скорее, я использую RX поверх буферного блока, чтобы подписаться на сообщения. Настроено это так:

var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);

Все идет нормально.

Впрочем, когда жду будущего результата. вот так:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}",
          System.Threading.Thread.CurrentThread.GetHashCode());
        })
       .Default(_ => Console.WriteLine("Unknown message"));
     ...

Я знаю, что это нормальное поведение async await, но я действительно должен убедиться, что только один поток имеет доступ к моему актору.

Я не хочу, чтобы будущее запускалось синхронно, я хочу запускать async, как обычно, но я хочу, чтобы продолжение выполнялось в том же потоке, что и обработчик сообщений / субъект.

Мой код для будущей поддержки выглядит так:

public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
    TaskCompletionSource<IMessage> result = 
        new TaskCompletionSource<IMessage>();
    var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());

    // once this object gets a response, 
    // we set the result for the task completion source
    var futureActorRef = new FutureActorRef(result);            
    future.Tell(new SetRespondTo(), futureActorRef); 
    actor.Tell(message, future); 
    return result.Task;
}

Есть идеи, что я могу сделать, чтобы заставить продолжение работать в том же потоке, который запустил приведенный выше код?


person Roger Johansson    schedule 01.01.2014    source источник
comment
ОТ: если кто-то хочет помочь с этим или просто поиграться, код находится здесь github.com/rogeralsing / Голубь   -  person Roger Johansson    schedule 01.01.2014
comment
Что-то в этой ветке должно сотрудничать. Вы не можете перехватить нить. Поток должен как-то называть ваше продолжение. Возможно, используйте настраиваемый контекст синхронизации.   -  person usr    schedule 01.01.2014
comment
Разве нельзя как-то запланировать продолжение только для активного потока?   -  person Roger Johansson    schedule 01.01.2014
comment
Представьте, что поток делает это: while(true);. Как другой код может выполняться в этом потоке? Код нельзя прервать (в произвольных местах). Если ваша цепочка выглядела так: while(true) ExecuteNextItemForSyncContext();, то кто-то может отправить элемент и ввести код. Посмотрите, как работают контексты синхронизации.   -  person usr    schedule 01.01.2014
comment
Я почти уверен, что здесь последний случай, поскольку BufferBlock использует обычные потоки пула потоков, поэтому он должен планировать сообщения для определенного потока, я думаю   -  person Roger Johansson    schedule 01.01.2014
comment
Потоки Threadpool не берут определенные рабочие элементы, они берут неопределенные элементы из очереди. Вы не сможете настроить таргетинг на какой-либо конкретный поток. Однако вы можете использовать настраиваемый TaskScheduler, который использует потоки, которые вы контролируете. Теперь это становится непросто. Может, проще убрать требование попадания в одну ветку ?!   -  person usr    schedule 01.01.2014
comment
Это было бы очень плохо, если актор обрабатывает сообщение и в то же время другой поток обрабатывает продолжение ожидания для того же актера, возникнут проблемы с параллелизмом, которые актор должен предотвратить.   -  person Roger Johansson    schedule 01.01.2014
comment
Я решил это :) Актеры FTW, как только результат получен, я оборачиваю источник завершения в Action и передаю его как специальное сообщение собственному актору, таким образом SetResult вызывается из самого Actor   -  person Roger Johansson    schedule 01.01.2014
comment
Я считаю, что это работает только случайно. SetResult не гарантирует синхронное выполнение зарегистрированных обратных вызовов. Они могут случайно оказаться в пуле потоков.   -  person usr    schedule 01.01.2014
comment
Это не имеет значения, поскольку SetResult блокирует, он использует SpinWait до тех пор, пока задача не будет завершена, таким образом, актор, который получает сообщение с действием, блокируется на короткое время, когда задача завершается, и, таким образом, нет внутреннего параллелизма. могут возникнуть проблемы, однако похоже, что он всегда выполняется в одном потоке из тестов, которые я сделал прямо сейчас. не уверен на 100%, но, как я уже сказал, поскольку SetResult блокирует, это не имеет значения   -  person Roger Johansson    schedule 01.01.2014
comment
Я пытался сказать, что SetResult не гарантирует блокировку. Если вы не найдете этого утверждения в документации, это не гарантируется, и вы не можете на него полагаться. Вы также не можете проверить это.   -  person usr    schedule 01.01.2014
comment
Код внутри SetResult выдает исключение, если задача не завершается правильно во время ожидания спина, так как она выдает ошибку, это было бы критическим изменением, если бы они когда-либо изменили его, чтобы не блокировать   -  person Roger Johansson    schedule 01.01.2014
comment
Я нашел это задокументированным в MSDN: msdn.microsoft.com/en-us/library/ ExecuteSynchronously Указывает, что задача продолжения должна выполняться синхронно. Если указана эта опция, продолжение будет выполняться в том же потоке, который вызывает переход предшествующей задачи в ее конечное состояние. Если антецедент уже завершен при создании продолжения, продолжение будет выполняться в потоке, создающем продолжение. Только очень кратковременные продолжения должны выполняться синхронно. Это надежно.   -  person usr    schedule 01.01.2014
comment
@StephenCleary полезно знать! Параллелизм - такая деликатная тема. Эта неточная документация вызывает разочарование.   -  person usr    schedule 01.01.2014
comment
В любом случае, TrySetResult следует считать надежным, поскольку он возвращает логическое значение для успеха / неудачи, он не может вернуть true, если он не блокируется до тех пор, пока задача не будет завершена.   -  person Roger Johansson    schedule 02.01.2014
comment
неправильная ветка: почему вы считаете, что одна ветка лучше другой? Вы тоже думаете, что аппаратный процессор может быть лучше других?   -  person Alexei Kaigorodov    schedule 19.01.2020
comment
Этой теме 6 лет .....   -  person Roger Johansson    schedule 19.01.2020


Ответы (2)


Я делаю перенос фреймворка AKKA для .NET

Милая. Я пошел на лекцию по Akka на CodeMash '13, несмотря на то, что никогда не касался Java / Scala / Akka. Я увидел большой потенциал для библиотеки / фреймворка .NET. Microsoft работает над чем-то похожим, которое, я надеюсь, в конечном итоге станет общедоступным (это в настоящее время в ограниченном превью).

Я подозреваю, что оставаться в мире Dataflow / Rx как можно больше - это более простой подход; async лучше всего подходит для асинхронных операций (с одним запуском и одним результатом для каждой операции), тогда как Dataflow и Rx лучше работают с потоками и подписками (с одним запуском и несколькими результатами). Итак, моя первая внутренняя реакция - либо связать буферный блок с ActionBlock с конкретным планировщиком, либо использовать ObserveOn для перемещения уведомлений Rx в конкретный планировщик вместо того, чтобы пытаться сделать это на стороне async. Конечно, я не очень хорошо знаком с дизайном API Akka, так что отнеситесь к этому с недоверием.

В любом случае, мое async intro описывает только два надежных варианта планирования await продолжения: SynchronizationContext.Current и TaskScheduler.Current. Если ваш порт Akka больше похож на фреймворк (где ваш код выполняет хостинг, а код конечного пользователя всегда выполняется вашим кодом), тогда SynchronizationContext может иметь смысл. Если ваш порт больше похож на библиотеку (где код конечного пользователя выполняет хостинг и при необходимости вызывает ваш код), тогда TaskScheduler будет иметь больше смысла.

Примеров нестандартного SynchronizationContext не так много, потому что это довольно редко. У меня есть AsyncContextThread type в моем библиотека AsyncEx, которая определяет как SynchronizationContext, так и TaskScheduler для этого потока. Существует несколько примеров пользовательских TaskScheduler, например Параллельные расширения Extras, в котором есть планировщик STA и планировщик текущего потока.

person Stephen Cleary    schedule 01.01.2014
comment
Обратите внимание, что Microsoft публично работает над библиотекой ActorFx. Мне кажется неясным, следует ли объединить Orleans и ActorFx в будущем или в чем будет заключаться сделка. Также было некоторое обсуждение группы пользователей F # относительно порта Akka на F # и указателей на некоторый код реализации. Я предполагаю, что основные моменты находятся в Fakka - F # Akka и роли, которую он может играть на f # более широкую привлекательность. - person Veksi; 07.01.2014

Планировщик задач решает, запускать ли задачу в новом потоке или в текущем потоке. Есть возможность принудительно запустить его в новом потоке, но не заставить его запускаться в текущем потоке. Но есть метод Task.RunSynchronously (), который запускает задачу синхронно в текущем TaskScheduler. Также, если вы используете async / await, по этому поводу уже есть аналогичный вопрос.

person Yanshof    schedule 01.01.2014
comment
Но этот вопрос не о запуске задачи, а о том, где выполнение продолжается после ожидания. - person svick; 01.01.2014