TPL Dataflow Broadcastblock отбрасывает последнее сообщение

У меня довольно простая проблема. Мне нужен способ легко выполнять некоторую обработку сообщений, которая занимает некоторое время. В процессе обработки могут быть введены новые запросы, но все запросы, кроме последнего, могут быть отброшены.

Поэтому я подумал, что TPL Broadcastblock должен делать именно это, глядя на документацию и сообщения, например, на StackExchange. Я создал следующее решение и добавил для него несколько модульных тестов, но в модульных тестах иногда последний элемент не отправляется.

Это не то, что я ожидал. Если он должен что-то удалить, я бы сказал, что он должен удалить первый элемент, поскольку он должен перезаписать свой буфер, равный 1, если он не может обработать сообщение. Кто-нибудь может увидеть, что это такое?
Будем очень признательны за любую помощь!

Вот код блока:

/// <summary>
/// This block will take items and perform the specified action on it. Any incoming messages while the action is being performed
/// will be discarded.
/// </summary>
public class DiscardWhileBusyActionBlock<T> : ITargetBlock<T>
{
    private readonly BroadcastBlock<T> broadcastBlock;

    private readonly ActionBlock<T> actionBlock;

    /// <summary>
    /// Initializes a new instance of the <see cref="DiscardWhileBusyActionBlock{T}"/> class.
    /// Constructs a SyncFilterTarget{TInput}.
    /// </summary>
    /// <param name="actionToPerform">Thing to do.</param>
    public DiscardWhileBusyActionBlock(Action<T> actionToPerform)
    {
        if (actionToPerform == null)
        {
            throw new ArgumentNullException(nameof(actionToPerform));
        }

        this.broadcastBlock = new BroadcastBlock<T>(item => item);
        this.actionBlock = new ActionBlock<T>(actionToPerform, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
        this.broadcastBlock.LinkTo(this.actionBlock);
        this.broadcastBlock.Completion.ContinueWith(task => this.actionBlock.Complete());
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return ((ITargetBlock<T>)this.broadcastBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

    public void Complete()
    {
        this.broadcastBlock.Complete();
    }

    public void Fault(Exception exception)
    {
        ((ITargetBlock<T>)this.broadcastBlock).Fault(exception);
    }

    public Task Completion => this.actionBlock.Completion;
}

И вот код для теста:

[TestClass]
public class DiscardWhileBusyActionBlockTest
{
    [TestMethod]
    public void PostToConnectedBuffer_ActionNotBusy_MessageConsumed()
    {
        var actionPerformer = new ActionPerformer();

        var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
        var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);

        buffer.Post(1);

        DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);

        var expectedMessages = new[] { 1 };
        actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
    }

    [TestMethod]
    public void PostToConnectedBuffer_ActionBusy_MessagesConsumedWhenActionBecomesAvailable()
    {
        var actionPerformer = new ActionPerformer();

        var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
        var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);

        actionPerformer.SetBusy();

        // 1st message will set the actionperformer to busy, 2nd message should be sent when
        // it becomes available.
        buffer.Post(1);
        buffer.Post(2);

        actionPerformer.SetAvailable();

        DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);

        var expectedMessages = new[] { 1, 2 };
        actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
    }

    [TestMethod]
    public void PostToConnectedBuffer_ActionBusy_DiscardMessagesInBetweenAndProcessOnlyLastMessage()
    {
        var actionPerformer = new ActionPerformer();

        var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
        var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);

        actionPerformer.SetBusy();

        buffer.Post(1);
        buffer.Post(2);
        buffer.Post(3);
        buffer.Post(4);
        buffer.Post(5);

        actionPerformer.SetAvailable();

        DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);

        var expectedMessages = new[] { 1, 5 };
        actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
    }

    private static void WaitForCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        source.Complete();
        target.Completion.Wait(TimeSpan.FromSeconds(1));
    }

    private static BufferBlock<int> SetupBuffer(ITargetBlock<int> block)
    {
        var buffer = new BufferBlock<int>();
        buffer.LinkTo(block);
        buffer.Completion.ContinueWith(task => block.Complete());
        return buffer;
    }

    private class ActionPerformer
    {
        private readonly ManualResetEvent resetEvent = new ManualResetEvent(true);

        public List<int> LastReceivedMessage { get; } = new List<int>();

        public void Perform(int message)
        {
            this.resetEvent.WaitOne(TimeSpan.FromSeconds(3));
            this.LastReceivedMessage.Add(message);
        }

        public void SetBusy()
        {
            this.resetEvent.Reset();
        }

        public void SetAvailable()
        {
            this.resetEvent.Set();
        }
    }
}

person Michel Klonen    schedule 31.07.2017    source источник


Ответы (1)


Когда вы выравниваете BoundedCapacity блока действий на 1, это означает, что если он выполняет обработку и уже имеет элемент в своей очереди, он отбрасывает сообщение, которое выходит за рамки. Итак, в основном происходит то, что ваш блок выполняет свою работу, отклоняя новое сообщение, пока буфер заполнен. После этого выполняется широковещательный блок, поскольку получателям отправляются целые сообщения, и он вызывает Completion, завершающий весь конвейер.

Вам нужно либо проверить возвращенное логическое значение Post для последних сообщений, либо, что более вероятно, сохранить последнее сообщение в какой-либо переменной, гарантируя, что оно попадет в конвейер. Похоже, вам лучше не использовать BroadcastBlock, поскольку его цель предоставьте копию сообщения количеству связанных блоков и просто напишите свою логику самостоятельно. Возможно, вместо этого вы можете использовать простой BufferBlock.

Обновление: OfferMessage метод также предоставляет информацию о предлагаемом сообщении. Я думаю, вам вообще не нужен буферный блок, так как вам приходится иметь дело с логикой, отличной от стандартной для вашего конвейера. Проще иметь поле типа _lastMessage, хранить в нем последнее значение и стирать его, когда запрос принимается actionBlock. Вы даже можете полностью удалить зависимость потока данных, так как все, что вы делаете, это вызывает метод для запроса.

Дополнительные примечания: вы можете связать блоки с распространение завершения задается в опциях:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
this.broadcastBlock.LinkTo(this.actionBlock, linkOptions);

Это может удалить часть вашего кода с потенциально опасными ContinueWith использование. Также вы можете использовать await broadcastBlock.SendAsync() вместо Post, если вам нужно асинхронное поведение.

person VMAtm    schedule 31.07.2017
comment
Спасибо за ответ (и примечание;) Итак, если я правильно вас понял, Broadcastblock выполняет свою работу по перезаписи своего буфера, но Actionblock на самом деле их удаляет? Интересный! Однако я не понимаю вашего другого комментария при проверке возвращаемого значения Post. Я не отправляю сообщения в блок действий, это делается методом OfferMessage. Однако Bufferblock не будет работать, поскольку он буферизует все входящие сообщения, в то время как я хочу иметь буфер только из одного, который перезаписывается каждый раз, когда сообщение получено, когда получатель занят. Не могли бы вы уточнить это, пожалуйста? - person Michel Klonen; 02.08.2017
comment
Обновленный ответ. - person VMAtm; 02.08.2017
comment
Хорошо, спасибо за объяснение. Поэтому я делаю вывод, что не могу сделать это, используя встроенную логику TPL, и должен найти собственное решение. Спасибо еще раз! - person Michel Klonen; 08.08.2017