Создание шины сообщений с помощью TPL Dataflow

Я искал легкую, обрабатываемую шину асинхронных сообщений и наткнулся на TPL Dataflow.

Моя текущая реализация приведена ниже (полный пример см. https://gist.github.com/4416655).

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))
        {
            subscription.Dispose();
        }
    }

    private Guid AddSubscription(IDisposable subscription)
    {
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;
    }
}

У меня есть несколько общих вопросов об использовании TPL Dataflow в сценарии обмена сообщениями.

  • Является ли BroadcastBlock<T> рекомендуемым источником для одновременной отправки сообщений нескольким обработчикам? К такому выводу я пришел на основании этот пост.
  • В моей реализации я использую один BroadcastBlock<T> экземпляр для всех типов сообщений. Может ли это вызвать проблемы при обработке большого количества сообщений? Должен ли я создавать отдельный экземпляр для каждого типа сообщения?
  • BroadcastBlock<T> всегда сохраняет последний отправленный элемент. Это означает, что при любых новых подписках (ссылках) это сообщение будет автоматически передаваться. Можно изменить это поведение (новые подписки должны получать только новые сообщения).
  • В моем тестовом приложении я ввел задержку в первом обработчике:

        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
        });
    

    При отправке сообщения я ожидал, что каждое сообщение будет выводиться на консоль одно за другим с шагом в 2 секунды. Вместо этого через 2 секунды все сообщения выводились сразу. Я предполагаю, что это связано с параллелизмом, выполняемым базовым планировщиком, но мне любопытно, как я могу изменить эти настройки (настройка MaxDegreeOfParallelism = 1 не имеет значения).

  • Наконец, хотя SendAsync позволяет мне ожидать отправки сообщения, он не позволяет мне ожидать завершения цели (ActionBlock<T>). Я думал, что это то, что будет делать PropagateCompletion, но, похоже, это не так. В идеале я хотел бы знать, когда все обработчики сообщения выполнены.

Обновлять

Причина, по которой я не получил ожидаемого поведения с Task.Delay, заключается в том, что это задерживало выполнение каждого обработчика, а не обработку всех обработчиков. Thread.Sleep было тем, что мне было нужно.


person Ben Foster    schedule 31.12.2012    source источник


Ответы (1)


Ответив на ваши вопросы (см. Ниже), я понял, что моделирование вашего дизайна с помощью блоков TPL Dataflow, вероятно, не лучшая идея. TDF хорош для обработки сообщений в значительной степени независимыми блоками без встроенного способа отслеживания отдельного сообщения. Но это то, что вы, кажется, хотите: обрабатывать сообщение обработчиками последовательно с отслеживанием завершения каждого сообщения.

Из-за этого, я думаю, вам не следует создавать целую сеть потоков данных, а вместо этого использовать один ActionBlock в качестве процессора асинхронных сообщений:

public class Bus
{
    class Subscription
    {
        public Guid Id { get; private set; }
        public Func<object, Task> HandlerAction { get; private set; }

        public Subscription(Guid id, Func<object, Task> handlerAction)
        {
            Id = id;
            HandlerAction = handlerAction;
        }
    }

    private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
    private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();

    private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;

    public Bus()
    {
        // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
        var subscriptions = new List<Subscription>();

        m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
            async tuple =>
            {
                var message = tuple.Item1;
                var completedAction = tuple.Item2;

                // could be made more efficient, probably doesn't matter
                Guid idToUnsubscribe;
                while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                {
                    subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
                }

                Subscription handlerToSubscribe;
                while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
                {
                    subscriptions.Add(handlerToSubscribe);
                }

                foreach (var subscription in subscriptions)
                {
                    await subscription.HandlerAction(message);
                }

                completedAction();
            });
    }

    public Task SendAsync<TMessage>(TMessage message)
    {
        var tcs = new TaskCompletionSource<bool>();
        Action completedAction = () => tcs.SetResult(true);

        m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));

        return tcs.Task;
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        return Subscribe<TMessage>(
            message =>
            {
                handlerAction(message);
                // we need a completed non-generic Task; this is a simple, efficient way to get it
                // another option would be to use async lambda with no await,
                // but that's less efficient and produces a warning
                return Task.FromResult(false);
            });
    }

    public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
    {
        Func<object, Task> actionWithCheck = async message =>
        {
            if (message is TMessage)
                await handlerAction((TMessage)message);
        };

        var id = Guid.NewGuid();
        m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
        return id;
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        m_idsToUnsubscribe.Enqueue(subscriptionId);
    }
}

(Я решил использовать очереди для подписки и отписки, чтобы список обработчиков не менялся во время обработки сообщения.)

Ответы на ваши вопросы

Является ли BroadcastBlock<T> рекомендуемым источником для одновременной отправки сообщений нескольким обработчикам?

Да, на первый взгляд кажется, что BroadcastBlock<T> - это то, что вы хотите. Непосредственно в TPL Dataflow точно нет подобного блока.

В своей реализации я использую один экземпляр BroadcastBlock для всех типов сообщений. Может ли это вызвать проблемы при обработке большого количества сообщений? Должен ли я создавать отдельный экземпляр для каждого типа сообщения?

Имея единый блок для всех типов сообщений, вы выполняете больше работы (отправку всем обработчикам) в одном потоке. Имея один блок для каждого типа сообщения, вы будете выполнять меньше работы (отправляя только правильным обработчикам), которая может выполняться в нескольких потоках. В связи с этим, думаю, разумно предположить, что последний будет быстрее.

Но не забывайте правила оптимизации производительности приложений: во-первых, напишите простой и читаемый код. Только если окажется, что он действительно медленный, попробуйте его оптимизировать. И при сравнении двух альтернатив всегда используйте профилирование, чтобы выяснить, какая из них на самом деле быстрее, а не просто угадывайте, какая из них должна быть быстрее.

BroadcastBlock<T> всегда сохраняет последний отправленный элемент. Это означает, что при любых новых подписках (ссылках) это сообщение будет автоматически передаваться. Можно ли изменить это поведение (новые подписки должны получать только новые сообщения)?

Нет, настроить BroadcastBlock<T> для этого невозможно. Если вам не нужны все функции BroadcastBlock<T> (отправка в блоки с ограниченной емкостью, которые могут быть временно заполнены, поддержка нежадных блоков в качестве целей), вы можете написать для этого собственную версию BroadcastBlock<T>.

При отправке сообщения я ожидал, что каждое сообщение будет выводиться на консоль одно за другим с шагом 2 секунды. Вместо этого через 2 секунды все сообщения выводились сразу. Я предполагаю, что это связано с параллелизмом, выполняемым базовым планировщиком, но мне любопытно, как я могу изменить эти настройки (настройка MaxDegreeOfParallelism = 1 не имела никакого значения).

Одна из особенностей TDF заключается в том, что каждый блок независим, поэтому несколько блоков могут выполняться в нескольких потоках. Если это не то, что вы хотите, возможно, использование отдельных ActionBlock<T> для каждого обработчика может быть не лучшим решением. На самом деле TDF может быть вовсе не лучшим решением.

Кроме того, Subscribe() принимает Action<TMessage>, что означает, что ваша лямбда будет скомпилирована как метод async void. Их следует использовать только в особых (и относительно редких) случаях, когда у вас нет других вариантов. Если вы хотите поддерживать async обработчиков, вы должны принять async Task методов, то есть Func<TMessage, Task>.

Причина, по которой я не получил ожидаемого поведения с Task.Delay, заключается в том, что это задерживало выполнение каждого обработчика, а не обработку всех обработчиков. Thread.Sleep было тем, что мне было нужно.

Использование Thread.Sleep() противоречит самой идее асинхронности, вам не следует использовать его, если это возможно. Кроме того, я не думаю, что он действительно работал так, как вы хотели: он вводил задержку для каждого потока, но TPL Dataflow будет использовать более одного потока, поэтому это не будет вести себя так, как вы планировали.

Наконец, хотя SendAsync позволяет мне ожидать отправки сообщения, он не позволяет мне ожидать завершения цели (ActionBlock<T>). Я думал, что это то, что будет делать PropagateCompletion, но, похоже, это не так. В идеале я хотел бы знать, когда все обработчики сообщения выполнены.

PropagateCompletion вместе с Complete() и Completion предназначен для обработки завершения целых блоков, а не для обработки отдельного сообщения. Одна из причин этого заключается в том, что в более сложных сетях потоков данных может быть неясно, когда именно обрабатывается сообщение. Например, если сообщение уже было отправлено всем текущим целям BroadcastBlock<T>, но будет также отправлено всем вновь добавленным целям, следует ли считать его завершенным?

Если вы хотите сделать это, вам придется каким-то образом сделать это вручную, возможно, используя TaskCompletionSource.

person svick    schedule 31.12.2012
comment
Отличное решение. Мне особенно нравится идея очередей подписки / отписки. Один вопрос: как бы вы расширили это для поддержки передачи токена отмены, чтобы остановить выполнение обработчиков? - person Ben Foster; 14.01.2013
comment
Если вы хотите отменить обработку отдельного сообщения, я бы добавил cancellationToken и canceledAction к Tuple (что означает, что пользовательский класс, вероятно, будет лучше, чем Tuple в этот момент). Вы должны установить их в SendAsync() и использовать их в цикле subscritpions. В идеале handlerAction также должен принимать токен (по крайней мере, необязательно). - person svick; 14.01.2013
comment
Спасибо. Мне любопытно, почему бы не использовать TransformBlock, который возвращает Task. Тогда мы могли бы просто вернуть Task.WhenAll(subscriptions), а не использовать TaskCompletionSource. - person Ben Foster; 14.01.2013
comment
Я не уверен, что понимаю. TransformBlock по-прежнему не будет напрямую возвращать выходные данные для некоторых входных данных, вам нужно будет каким-то образом получить его результаты. Вы, конечно, можете делать то, что хотите, используя TransformBlock, но я думаю, что это было бы на самом деле сложнее, чем при использовании ActionBlock и TaskCompletionSource. - person svick; 14.01.2013
comment
Хорошо, еще раз спасибо. К вашему сведению, я отправил этот код на GitHub. github.com/benfoster/Fabrik.SimpleBus - person Ben Foster; 14.01.2013