Я делаю перенос фреймворка AKKA для .NET (не воспринимайте это слишком серьезно, сейчас это часть актерской части на выходных)
У меня проблемы с поддержкой "Future" в нем. В Java / Scala Akka Futures следует ожидать синхронно с вызовом Await. Подобно .NET Task.Wait ()
Моя цель - поддержать для этого истинный асинхронный ожидание. Сейчас это работает, но в моем текущем решении продолжение выполняется не в том потоке.
Это результат передачи сообщения одному из моих акторов, содержащему блок ожидания на будущее. Как видите, субъект всегда выполняется в одном потоке, а блок ожидания выполняется в случайном потоке пула потоков.
actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
Актер получает сообщение с помощью DataFlow BufferBlock<Message>
Или, скорее, я использую RX поверх буферного блока, чтобы подписаться на сообщения. Настроено это так:
var messages = new BufferBlock<Message>()
{
BoundedCapacity = 100,
TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
Все идет нормально.
Впрочем, когда жду будущего результата. вот так:
protected override void OnReceive(IMessage message)
{
....
var result = await Ask(logger, m);
// This is not executed on the same thread as the above code
result.Match()
.With<SomeMessage>(t => {
Console.WriteLine("await thread {0}",
System.Threading.Thread.CurrentThread.GetHashCode());
})
.Default(_ => Console.WriteLine("Unknown message"));
...
Я знаю, что это нормальное поведение async await, но я действительно должен убедиться, что только один поток имеет доступ к моему актору.
Я не хочу, чтобы будущее запускалось синхронно, я хочу запускать async, как обычно, но я хочу, чтобы продолжение выполнялось в том же потоке, что и обработчик сообщений / субъект.
Мой код для будущей поддержки выглядит так:
public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
TaskCompletionSource<IMessage> result =
new TaskCompletionSource<IMessage>();
var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());
// once this object gets a response,
// we set the result for the task completion source
var futureActorRef = new FutureActorRef(result);
future.Tell(new SetRespondTo(), futureActorRef);
actor.Tell(message, future);
return result.Task;
}
Есть идеи, что я могу сделать, чтобы заставить продолжение работать в том же потоке, который запустил приведенный выше код?
while(true);
. Как другой код может выполняться в этом потоке? Код нельзя прервать (в произвольных местах). Если ваша цепочка выглядела так:while(true) ExecuteNextItemForSyncContext();
, то кто-то может отправить элемент и ввести код. Посмотрите, как работают контексты синхронизации. - person usr   schedule 01.01.2014