Ответив на ваши вопросы (см. Ниже), я понял, что моделирование вашего дизайна с помощью блоков TPL Dataflow, вероятно, не лучшая идея. TDF хорош для обработки сообщений в значительной степени независимыми блоками без встроенного способа отслеживания отдельного сообщения. Но это то, что вы, кажется, хотите: обрабатывать сообщение обработчиками последовательно с отслеживанием завершения каждого сообщения.
Из-за этого, я думаю, вам не следует создавать целую сеть потоков данных, а вместо этого использовать один ActionBlock
в качестве процессора асинхронных сообщений:
public class Bus
{
class Subscription
{
public Guid Id { get; private set; }
public Func<object, Task> HandlerAction { get; private set; }
public Subscription(Guid id, Func<object, Task> handlerAction)
{
Id = id;
HandlerAction = handlerAction;
}
}
private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();
private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;
public Bus()
{
// subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
var subscriptions = new List<Subscription>();
m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
async tuple =>
{
var message = tuple.Item1;
var completedAction = tuple.Item2;
// could be made more efficient, probably doesn't matter
Guid idToUnsubscribe;
while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
{
subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
}
Subscription handlerToSubscribe;
while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
{
subscriptions.Add(handlerToSubscribe);
}
foreach (var subscription in subscriptions)
{
await subscription.HandlerAction(message);
}
completedAction();
});
}
public Task SendAsync<TMessage>(TMessage message)
{
var tcs = new TaskCompletionSource<bool>();
Action completedAction = () => tcs.SetResult(true);
m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));
return tcs.Task;
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
return Subscribe<TMessage>(
message =>
{
handlerAction(message);
// we need a completed non-generic Task; this is a simple, efficient way to get it
// another option would be to use async lambda with no await,
// but that's less efficient and produces a warning
return Task.FromResult(false);
});
}
public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
{
Func<object, Task> actionWithCheck = async message =>
{
if (message is TMessage)
await handlerAction((TMessage)message);
};
var id = Guid.NewGuid();
m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
return id;
}
public void Unsubscribe(Guid subscriptionId)
{
m_idsToUnsubscribe.Enqueue(subscriptionId);
}
}
(Я решил использовать очереди для подписки и отписки, чтобы список обработчиков не менялся во время обработки сообщения.)
Ответы на ваши вопросы
Является ли BroadcastBlock<T>
рекомендуемым источником для одновременной отправки сообщений нескольким обработчикам?
Да, на первый взгляд кажется, что BroadcastBlock<T>
- это то, что вы хотите. Непосредственно в TPL Dataflow точно нет подобного блока.
В своей реализации я использую один экземпляр BroadcastBlock для всех типов сообщений. Может ли это вызвать проблемы при обработке большого количества сообщений? Должен ли я создавать отдельный экземпляр для каждого типа сообщения?
Имея единый блок для всех типов сообщений, вы выполняете больше работы (отправку всем обработчикам) в одном потоке. Имея один блок для каждого типа сообщения, вы будете выполнять меньше работы (отправляя только правильным обработчикам), которая может выполняться в нескольких потоках. В связи с этим, думаю, разумно предположить, что последний будет быстрее.
Но не забывайте правила оптимизации производительности приложений: во-первых, напишите простой и читаемый код. Только если окажется, что он действительно медленный, попробуйте его оптимизировать. И при сравнении двух альтернатив всегда используйте профилирование, чтобы выяснить, какая из них на самом деле быстрее, а не просто угадывайте, какая из них должна быть быстрее.
BroadcastBlock<T>
всегда сохраняет последний отправленный элемент. Это означает, что при любых новых подписках (ссылках) это сообщение будет автоматически передаваться. Можно ли изменить это поведение (новые подписки должны получать только новые сообщения)?
Нет, настроить BroadcastBlock<T>
для этого невозможно. Если вам не нужны все функции BroadcastBlock<T>
(отправка в блоки с ограниченной емкостью, которые могут быть временно заполнены, поддержка нежадных блоков в качестве целей), вы можете написать для этого собственную версию BroadcastBlock<T>
.
При отправке сообщения я ожидал, что каждое сообщение будет выводиться на консоль одно за другим с шагом 2 секунды. Вместо этого через 2 секунды все сообщения выводились сразу. Я предполагаю, что это связано с параллелизмом, выполняемым базовым планировщиком, но мне любопытно, как я могу изменить эти настройки (настройка MaxDegreeOfParallelism = 1
не имела никакого значения).
Одна из особенностей TDF заключается в том, что каждый блок независим, поэтому несколько блоков могут выполняться в нескольких потоках. Если это не то, что вы хотите, возможно, использование отдельных ActionBlock<T>
для каждого обработчика может быть не лучшим решением. На самом деле TDF может быть вовсе не лучшим решением.
Кроме того, Subscribe()
принимает Action<TMessage>
, что означает, что ваша лямбда будет скомпилирована как метод async void
. Их следует использовать только в особых (и относительно редких) случаях, когда у вас нет других вариантов. Если вы хотите поддерживать async
обработчиков, вы должны принять async Task
методов, то есть Func<TMessage, Task>
.
Причина, по которой я не получил ожидаемого поведения с Task.Delay
, заключается в том, что это задерживало выполнение каждого обработчика, а не обработку всех обработчиков. Thread.Sleep
было тем, что мне было нужно.
Использование Thread.Sleep()
противоречит самой идее асинхронности, вам не следует использовать его, если это возможно. Кроме того, я не думаю, что он действительно работал так, как вы хотели: он вводил задержку для каждого потока, но TPL Dataflow будет использовать более одного потока, поэтому это не будет вести себя так, как вы планировали.
Наконец, хотя SendAsync
позволяет мне ожидать отправки сообщения, он не позволяет мне ожидать завершения цели (ActionBlock<T>
). Я думал, что это то, что будет делать PropagateCompletion
, но, похоже, это не так. В идеале я хотел бы знать, когда все обработчики сообщения выполнены.
PropagateCompletion
вместе с Complete()
и Completion
предназначен для обработки завершения целых блоков, а не для обработки отдельного сообщения. Одна из причин этого заключается в том, что в более сложных сетях потоков данных может быть неясно, когда именно обрабатывается сообщение. Например, если сообщение уже было отправлено всем текущим целям BroadcastBlock<T>
, но будет также отправлено всем вновь добавленным целям, следует ли считать его завершенным?
Если вы хотите сделать это, вам придется каким-то образом сделать это вручную, возможно, используя TaskCompletionSource
.
person
svick
schedule
31.12.2012