Я хочу написать приложение, которое оценивает данные датчиков с двух датчиков. Оба датчика отправляют свои данные в Package
объектах, которые разделены на Frame
объекта. Package
- это, по сути, Tuple<Timestamp, Data[]>
, Frame
- это Tuple<Timestamp, Data>
. Затем мне нужно всегда использовать Frame
с самой ранней меткой времени из обоих источников.
Итак, в основном мой объектный поток
Package -(1:n)-> Frame \
}-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /
Пример
Предположим, что каждый Package
содержит 2 или 3 значения (в действительности: 5-7) и целочисленные временные метки, которые увеличиваются на 1 (в действительности: ~ 200 Гц => ~ 5 мсек). "Данные" просто timestamp * 100
для простоты.
Packages (timestamp, values[])
Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
(29, [2700, 2800, 2900]), ...}
Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
(26, [2400, 2500, 2600]), ...}
После (1:n)
шагов:
Frames (timestamp, value)
Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
(22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
(29, 2900), ...}
Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}
После pair synchronized
шага:
Merged tuples (timestamp, source1, source2)
{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
(19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
(24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}
Обратите внимание, что отметка времени 23
отсутствует, поскольку ни один из обоих источников не отправил значение. Это просто побочный эффект. Могу я вставить пустой кортеж или нет, не имеет значения. Также не имеет значения, является ли кортеж (27, 2700, 2700)
или ((27, 2700), (27, 2700))
, т.е. е. Tuple<Timestamp, Data, Data>
или Tuple<Frame, Frame>
.
Я почти уверен, что часть (1:n)
должна быть _ 22_, если я правильно понял документацию.
Но какой блок мне использовать для pair synchronized
части? Сначала я подумал, что _ 24_ было бы тем, что я искал, но, похоже, он просто объединяет два элемента по индексу. Но поскольку не гарантируется, что оба конвейера начинаются с одной и той же временной метки, и что оба конвейера всегда будут генерировать постоянный поток непрерывных временных меток (потому что иногда пакеты с несколькими кадрами могут быть потеряны при передаче), это не вариант. Так что мне нужно больше «MergeBlock» с возможностью решать, какой элемент обоих входных потоков передать на выход следующим (если есть).
Я подумал, что мне придется написать что-то подобное самому. Но у меня проблемы с написанием кода, который правильно обрабатывает две переменные ISourceBlock и одну переменную ITargetBlock. Я в принципе застрял как можно раньше:
private void MergeSynchronized(
ISourceBlock<Frame> source1,
ISourceBlock<Frame> source2,
ITargetBlock<Tuple<Frame, Frame>> target)
{
var frame1 = source1.Receive();
var frame2 = source2.Receive();
//Loop {
// Depending on the timestamp [mis]match,
// either pair frame1+frame2 or frame1+null or null+frame2, and
// replace whichever frame(s) was/were propagated already
// with the next frame from the respective pipeline
//}
}
Я даже не уверен насчет этого черновика: должен ли метод быть async
, чтобы я мог использовать var frame1 = await source1.ReceiveAsnyc();
? В каком состоянии петля? Где и как проверить комплектность? Как решить очевидную проблему, заключающуюся в том, что мой код означает, что я должен ждать, пока разрыв в потоке не закончится, чтобы понять, что разрыв произошел?
Альтернатива, о которой я подумал, состоит в том, чтобы добавить дополнительный блок в конвейеры, гарантируя, что в конвейер помещается достаточное количество «сигнальных кадров» для каждого датчика, так что выравнивание всегда первого из каждого конвейера будет выравнивать правильные два. Я предполагаю, что это будет своего рода TransformManyBlock
, который читает фрейм, сравнивает "ожидаемую" временную метку с фактической временной меткой, а затем вставляет сигнальные кадры для отсутствующих временных меток, пока временная метка кадра снова не станет правильной.
Или часть pair synchronized
- это место, где можно остановиться на объектах потока данных TPL и запустить фактический код, который уже работает с частью Data
?
JoinBlock
, при каких условиях вы хотите, чтобы ваше слияние присоединялось кFrame
с нулевой? - person JSteward   schedule 23.04.2019