Как синхронно объединить два конвейера TPL DataFlow?

Я хочу написать приложение, которое оценивает данные датчиков с двух датчиков. Оба датчика отправляют свои данные в Package объектах, которые разделены на Frame объекта. Package - это, по сути, Tuple<Timestamp, Data[]>, Frame - это Tuple<Timestamp, Data>. Затем мне нужно всегда использовать Frame с самой ранней меткой времени из обоих источников.

Итак, в основном мой объектный поток

Package -(1:n)-> Frame \
                        }-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /

Пример

Предположим, что каждый Package содержит 2 или 3 значения (в действительности: 5-7) и целочисленные временные метки, которые увеличиваются на 1 (в действительности: ~ 200 Гц => ~ 5 мсек). "Данные" просто timestamp * 100 для простоты.

Packages (timestamp, values[])

Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
 (29, [2700, 2800, 2900]), ...}

Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
 (26, [2400, 2500, 2600]), ...}

После (1:n) шагов:

Frames (timestamp, value)

Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
 (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
 (29, 2900), ...}

Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
 (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}

После pair synchronized шага:

Merged tuples (timestamp, source1, source2)

{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
 (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
 (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}

Обратите внимание, что отметка времени 23 отсутствует, поскольку ни один из обоих источников не отправил значение. Это просто побочный эффект. Могу я вставить пустой кортеж или нет, не имеет значения. Также не имеет значения, является ли кортеж (27, 2700, 2700) или ((27, 2700), (27, 2700)), т.е. е. Tuple<Timestamp, Data, Data> или Tuple<Frame, Frame>.


Я почти уверен, что часть (1:n) должна быть _ 22_, если я правильно понял документацию.

Но какой блок мне использовать для pair synchronized части? Сначала я подумал, что _ 24_ было бы тем, что я искал, но, похоже, он просто объединяет два элемента по индексу. Но поскольку не гарантируется, что оба конвейера начинаются с одной и той же временной метки, и что оба конвейера всегда будут генерировать постоянный поток непрерывных временных меток (потому что иногда пакеты с несколькими кадрами могут быть потеряны при передаче), это не вариант. Так что мне нужно больше «MergeBlock» с возможностью решать, какой элемент обоих входных потоков передать на выход следующим (если есть).

Я подумал, что мне придется написать что-то подобное самому. Но у меня проблемы с написанием кода, который правильно обрабатывает две переменные ISourceBlock и одну переменную ITargetBlock. Я в принципе застрял как можно раньше:

private void MergeSynchronized(
    ISourceBlock<Frame> source1,
    ISourceBlock<Frame> source2,
    ITargetBlock<Tuple<Frame, Frame>> target)
{
  var frame1 = source1.Receive();
  var frame2 = source2.Receive();

  //Loop {
  //  Depending on the timestamp [mis]match,
  //  either pair frame1+frame2 or frame1+null or null+frame2, and
  //  replace whichever frame(s) was/were propagated already
  //  with the next frame from the respective pipeline
  //}
}

Я даже не уверен насчет этого черновика: должен ли метод быть async, чтобы я мог использовать var frame1 = await source1.ReceiveAsnyc();? В каком состоянии петля? Где и как проверить комплектность? Как решить очевидную проблему, заключающуюся в том, что мой код означает, что я должен ждать, пока разрыв в потоке не закончится, чтобы понять, что разрыв произошел?

Альтернатива, о которой я подумал, состоит в том, чтобы добавить дополнительный блок в конвейеры, гарантируя, что в конвейер помещается достаточное количество «сигнальных кадров» для каждого датчика, так что выравнивание всегда первого из каждого конвейера будет выравнивать правильные два. Я предполагаю, что это будет своего рода TransformManyBlock, который читает фрейм, сравнивает "ожидаемую" временную метку с фактической временной меткой, а затем вставляет сигнальные кадры для отсутствующих временных меток, пока временная метка кадра снова не станет правильной.

Или часть pair synchronized - это место, где можно остановиться на объектах потока данных TPL и запустить фактический код, который уже работает с частью Data?


person LWChris    schedule 22.04.2019    source источник
comment
Для записей: у меня есть сильное чувство, что попытка синхронизировать два конвейера DataFlow в первую очередь сводит на нет весь смысл DataFlow. DataFlow, похоже, позволяет вам обрабатывать данные как можно быстрее, не думая о потоках или циклах ... Так что, возможно, это вообще мусор, и я должен как можно быстрее вывести данные в поток результатов, а затем объединить их когда второй поток также имеет результат позже?   -  person LWChris    schedule 22.04.2019
comment
всегда использовать фрейм с самой ранней меткой времени из обоих источников не могли бы вы прояснить это немного, похоже, это именно то, что вы получили бы с JoinBlock, при каких условиях вы хотите, чтобы ваше слияние присоединялось к Frame с нулевой?   -  person JSteward    schedule 23.04.2019
comment
@JSteward Из обоих разделенных конвейеров я беру текущий элемент из очереди. Это будет самая ранняя отметка времени для очереди. Итак, у меня есть отметки времени двух элементов. Либо отметки времени выравниваются, затем мне нужно их соединить, либо они не выравниваются, тогда мне нужно использовать только более ранний и ждать, пока этот конвейер, так сказать, догонит. Но даже если они синхронизированы, это не означает, что они и впредь будут синхронизироваться.   -  person LWChris    schedule 23.04.2019
comment
@JSteward Я добавил подробный пример.   -  person LWChris    schedule 23.04.2019


Ответы (2)


Вот реализация блока SynchronizedJoinBlock, аналогичного тому, который представлен в ответе Харди Хобека. Он заботится о некоторых незначительных деталях, таких как отмена, обработка исключений и обработка оставшихся элементов, когда входные блоки Target1 и Target2 помечены как завершенные. Кроме того, логика слияния не включает рекурсию, которая должна улучшить ее работу (надеюсь, я ее не измерял) и не быть восприимчивой к исключениям переполнения стека. Небольшое отклонение: на выходе будет ValueTuple<T1, T2> вместо Tuple<T1, T2> (с целью сокращения распределения).

public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>
{
    private readonly Func<T1, T2, int> _comparison;
    private readonly Queue<T1> _queue1 = new Queue<T1>();
    private readonly Queue<T2> _queue2 = new Queue<T2>();
    private readonly ActionBlock<T1> _input1;
    private readonly ActionBlock<T2> _input2;
    private readonly BufferBlock<(T1, T2)> _output;
    private readonly object _locker = new object();

    public SynchronizedJoinBlock(Func<T1, T2, int> comparison,
        CancellationToken cancellationToken = default)
    {
        _comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));

        // Create the three internal blocks
        var options = new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken
        };
        _input1 = new ActionBlock<T1>(Add1, options);
        _input2 = new ActionBlock<T2>(Add2, options);
        _output = new BufferBlock<(T1, T2)>(options);

        // Link the input blocks with the output block
        var inputTasks = new Task[] { _input1.Completion, _input2.Completion };
        Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>
        {
            // If ANY input block fails, then the whole block has failed
            ((IDataflowBlock)_output).Fault(t.Exception.InnerException);
            if (!_input1.Completion.IsCompleted) _input1.Complete();
            if (!_input2.Completion.IsCompleted) _input2.Complete();
            ClearQueues();
        }, default, TaskContinuationOptions.OnlyOnFaulted |
            TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default);
        Task.WhenAll(inputTasks).ContinueWith(t =>
        {
            // If ALL input blocks succeeded, then the whole block has succeeded
            try
            {
                if (!t.IsCanceled) PostRemaining(); // Post what's left
            }
            catch (Exception ex)
            {
                ((IDataflowBlock)_output).Fault(ex);
            }
            _output.Complete();
            ClearQueues();
        }, default, TaskContinuationOptions.NotOnFaulted |
            TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default);
    }

    public ITargetBlock<T1> Target1 => _input1;
    public ITargetBlock<T2> Target2 => _input2;
    public Task Completion => _output.Completion;

    private void Add1(T1 value1)
    {
        lock (_locker)
        {
            _queue1.Enqueue(value1);
            FindAndPostMatched_Unsafe();
        }
    }

    private void Add2(T2 value2)
    {
        lock (_locker)
        {
            _queue2.Enqueue(value2);
            FindAndPostMatched_Unsafe();
        }
    }

    private void FindAndPostMatched_Unsafe()
    {
        while (_queue1.Count > 0 && _queue2.Count > 0)
        {
            var result = _comparison(_queue1.Peek(), _queue2.Peek());
            if (result < 0)
            {
                _output.Post((_queue1.Dequeue(), default));
            }
            else if (result > 0)
            {
                _output.Post((default, _queue2.Dequeue()));
            }
            else // result == 0
            {
                _output.Post((_queue1.Dequeue(), _queue2.Dequeue()));
            }
        }
    }

    private void PostRemaining()
    {
        lock (_locker)
        {
            while (_queue1.Count > 0)
            {
                _output.Post((_queue1.Dequeue(), default));
            }
            while (_queue2.Count > 0)
            {
                _output.Post((default, _queue2.Dequeue()));
            }
        }
    }

    private void ClearQueues()
    {
        lock (_locker)
        {
            _queue1.Clear();
            _queue2.Clear();
        }
    }

    public void Complete() => _output.Complete();

    public void Fault(Exception exception)
        => ((IDataflowBlock)_output).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,
        DataflowLinkOptions linkOptions)
        => _output.LinkTo(target, linkOptions);

    public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)
        => _output.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<(T1, T2)> items)
        => _output.TryReceiveAll(out items);

    (T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,
        out bool messageConsumed)
        => ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(
            messageHeader, target, out messageConsumed);

    void ISourceBlock<(T1, T2)>.ReleaseReservation(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
        => ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(
            messageHeader, target);

    bool ISourceBlock<(T1, T2)>.ReserveMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
        => ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(
            messageHeader, target);
}

Пример использования:

var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(
    (x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));

var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),
    (20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),
    (27, 2700), (28, 2800), (29, 2900)};

var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),
    (18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),
    (25, 2500), (26, 2600)};

Array.ForEach(source1, x => joinBlock.Target1.Post(x));
Array.ForEach(source2, x => joinBlock.Target2.Post(x));

joinBlock.Target1.Complete();
joinBlock.Target2.Complete();

while (joinBlock.OutputAvailableAsync().Result)
{
    Console.WriteLine($"> Received: {joinBlock.Receive()}");
}

Выход:

Получено: ((0, 0), (15, 1500))
Получено: ((0, 0), (16, 1600))
Получено: ((17, 1700), (17, 1700) )
Получено: ((18, 1800), (18, 1800))
Получено: ((19, 1900), (19, 1900))
Получено: ((20, 2000), ( 20, 2000))
Получено: ((21, 2100), (21, 2100))
Получено: ((22, 2200), (0, 0))
Получено: ((0, 0), (24, 2400))
Получено: ((25, 2500), (25, 2500))
Получено: ((26, 2600), (26, 2600))
Получено: ((27, 2700), (0, 0))
Получено: ((28, 2800), (0, 0))
Получено: ((29, 2900), (0, 0))

Предполагается, что входящие данные упорядочены.

Этот класс имеет аналогичную структуру с классом JoinDependencyBlock, который я опубликовал некоторое время назад в несколько связанном вопросе.

person Theodor Zoulias    schedule 05.11.2019
comment
Переключено с Lists на Queues для внутренней памяти. - person Theodor Zoulias; 06.11.2019
comment
Это выглядит потрясающе. Ваше предположение, что входные данные были упорядочены, верно. Я протестирую это и приму это, если это сработает. - person LWChris; 09.11.2019
comment
Это хорошая новость, потому что, если это не было заказано, вам придется передать компараторы для T1 и T2 в дополнение к comparison лямбда, а также заменить внутренние Queue некоторой упорядоченной структурой данных. И .NET не предлагает никакой эффективной упорядоченной коллекции, допускающей дублирование. - person Theodor Zoulias; 10.11.2019
comment
Я наконец-то понял, как это работает. В принципе, если в обеих очередях есть хотя бы один элемент, истощите их по мере необходимости. Прервите истощение, как только одна очередь станет пустой. Каждый метод заполнения одной очереди повторно запускает этот механизм истощения. - person LWChris; 22.11.2019
comment
Поскольку FindAndPostMatched_Unsafe выполняется в lock контексте Add1 и Add2, добавление дополнительных значений к любым Queue<T> блокируется до тех пор, пока одна очередь не будет исчерпана. Предположим, что 2 начинается намного позже, чем 1, может случиться так, что при добавлении первого кадра из _queue2 необходимо удалить огромное количество кадров из _queue1, что надолго заблокирует оба метода Add#. Было бы лучше использовать _9 _ и поместить Enqueue вызовы в Add1 и Add2 вне контекста lock? - person LWChris; 22.11.2019
comment
@LWChris Я не думаю, что это будет проблемой, потому что вызовы методов внутри lock очень дешевы (за исключением comparison, который предоставляется вызывающим, так что это может быть что угодно). Я не думаю, что в этом случае два Queue можно безопасно заменить на ConcurrentQueue, потому что ими манипулируют в тандеме. Вы можете рассмотреть возможность получения-снятия блокировки для каждого цикла внутри FindAndPostMatched, что добавит общие (незначительные) накладные расходы, но предотвратит длительную блокировку в крайнем случае, о котором вы упомянули. - person Theodor Zoulias; 22.11.2019
comment
Хм, да ... учитывая, что в моем случае я записываю только с частотой 200 Гц, добавление одного элемента каждые 5 мс должно быть достаточно времени, чтобы исчерпать много данных из очереди на много миль впереди другой. Единственная другая проблема, которую можно вызвать, это то, что lock не гарантирует FIFO, но код будет самовосстанавливающимся, просто соединение пары слишком много кадров с null. - person LWChris; 22.11.2019
comment
@LWChris Я только что прочитал связанный вопрос. В вашем случае я думаю, что, поскольку за блокировку будут конкурировать только два потока, скрытый поток не сможет украсть блокировку у другого потока, который запросил его ранее. Некоторое время назад я провел несколько тестов для изучения конкуренции блокировок и заметил, что два потока могут получать одну и ту же блокировку миллион раз в секунду каждый, почти без конкуренции. Так что 200 раз в секунду - не о чем беспокоиться. :-) - person Theodor Zoulias; 22.11.2019

Проблема с TPL DataFlow API в том, что все является внутренним / частным и / или запечатанным. Это дает не так много возможностей для расширения API.

В любом случае для вашей проблемы было бы неплохо реализовать новый класс SynchronizedJoinBlock. Фактическая бизнес-логика находится в методе GetMessagesRecursive:

    public sealed class SynchronizedJoinBlock<T1, T2>
        : IReceivableSourceBlock<Tuple<T1, T2>>
    {
        private readonly object _syncObject = new object();
        private readonly Func<T1, T2, int> _compareFunction;
        private readonly Queue<T1> _target1Messages;
        private readonly Queue<T2> _target2Messages;
        private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;
        private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;
        private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;
        private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;

        public ITargetBlock<T1> Target1 => _target1;

        public ITargetBlock<T2> Target2 => _target2;

        public Task Completion => _transformManyBlock.Completion;

        public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)
        {
            _compareFunction = compareFunction
                ?? throw new ArgumentNullException(nameof(compareFunction));
            _batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);
            _target1Messages = new Queue<T1>();
            _target2Messages = new Queue<T2>();

            Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>
            {
                lock (_syncObject)
                {
                    if (_target1Messages.Count > 0 && _target2Messages.Count > 0)
                    {
                        return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();
                    }
                    else
                    {
                        return new Tuple<T1, T2>[0];
                    }
                }
            };

            _target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>
            {
                _target1Messages.Enqueue(element);
                return getMessagesFunction();
            });
            _target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });

            _target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>
            {
                _target2Messages.Enqueue(element);
                return getMessagesFunction();
            });
            _target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });

            _transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(
                element => element.Item1.Concat(element.Item2)
            );
            _batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        }

        private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)
        {
            int result = _compareFunction(value1, value2);
            if (result == 0)
            {
                yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());
            }
            else if (result < 0)
            {
                yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));

                if (_target1Messages.Count > 0)
                {
                    foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))
                    {
                        yield return item;
                    }
                }
            }
            else
            {
                yield return Tuple.Create(default(T1), _target2Messages.Dequeue());

                if (_target2Messages.Count > 0)
                {
                    foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))
                    {
                        yield return item;
                    }
                }
            }
        }

        public void Complete()
        {
            _target1.Complete();
            _target2.Complete();
        }

        Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(
            DataflowMessageHeader messageHeader,
            ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
        {
            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ConsumeMessage(messageHeader, target, out messageConsumed);
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            ((IDataflowBlock)_transformManyBlock).Fault(exception);
        }

        public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,
            DataflowLinkOptions linkOptions)
        {
            return _transformManyBlock.LinkTo(target, linkOptions);
        }

        void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ReleaseReservation(messageHeader, target);
        }

        bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ReserveMessage(messageHeader, target);
        }

        public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
        {
            return _transformManyBlock.TryReceive(filter, out item);
        }

        public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)
        {
            return _transformManyBlock.TryReceiveAll(out items);
        }
    }
person Hardy Hobeck    schedule 10.10.2019
comment
отлично. и вы абсолютно правы в отношении (что я считаю) главного ограничения в остальном очень хорошей библиотеки TPL Dataflow. - person davidbak; 11.10.2019