построение потока графа join_node

Я экспериментирую с Intel Graph Flow от TBB. Я очень доволен результатами, и я нашел продукт потрясающим, с неограниченными возможностями. Однако я столкнулся с пб, который я исправил, но я не удовлетворен. ПБ заключается в следующем.

   message
A ----------\       tuple<message,message>             WHATEVER
   message   join ------------------------- C------------------------
B ----------/

Этот шаблон применяется, когда мы хотим синхронизировать и избежать распространения n раз сообщения (и его значения). Intel предоставляет пример, который хорошо объясняет pb (и решение - Пример Intel). Мой pb - это построенный кортеж и построение графика, в котором используется статический подход. Он полностью статичен, особенно если количество входных ребер (input_port<i> в примере Intel) к узлу соединения является переменным.

Знает ли гуру потока TBB-графа "динамический подход" к этому pb?

Лучший,

Тим [РЕДАКТИРОВАТЬ мой код, настоящий pb]

Я могу сделать:

std::vector<tbb::flow::function_node<std::size_t, message>> vec_node;

for (int i(0) ; i < 3 ;++i)
        nodes_cont_.emplace_back(my_amazing_function_to_create_node(g_));

tbb::flow::make_edge(vec_node[0], tbb::flow::input_port<0>

tbb::flow::make_edge(vec_node[1], tbb::flow::input_port<1>(node_join_));
tbb::flow::make_edge(vec_node[2], tbb::flow::input_port<2>(node_join_));

Я не могу сделать:

for(int i(0); i < vec_node.size(); ++i)
    tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));

Из-за «кортежа» и функции «tbb::flow::input_port».


person Timocafé    schedule 20.07.2017    source источник
comment
Привет, Тим, я правильно тебя понимаю, что ты хочешь динамически добавлять ребра в join_node во время выполнения и, возможно, варьировать количество сообщений, которые должны быть объединены? Кроме того, как сопоставление ключей join_node помогло решить вашу проблему? Он был разработан для объединения точных ключевых сообщений, но кортеж входных портов по-прежнему статичен.   -  person Nikita Ponomarev    schedule 21.07.2017
comment
Не могли бы вы привести реальный пример использования FG с join_node?   -  person Nikita Ponomarev    schedule 21.07.2017
comment
Обновляю вопрос реальным пб, надеюсь поможет. Спасибо за интерес   -  person Timocafé    schedule 21.07.2017


Ответы (2)


Количество портов на узле join является статическим (определяется во время компиляции). Если вам нужно различное количество входов для одного вывода, вы должны иметь возможность указать, на какой «порт» пришло сообщение, а также его ценность.

TBB имеет тип варианта, который инкапсулирует номер порта и значение (это вывод indexer_node). Если вы используете этот тип (определить indexer_node, но не создавать его экземпляр, и вы можете использовать ::output_type узла) в качестве типа ввода для multifunction_node (который потенциально может иметь более одного вывода, но может иметь только один вывод), и пусть тело функции multifunction_node решает, когда у него есть правильное количество выходов, тогда вы можете хранить значения как они являются входными, и когда multifunction_node увидит «правильное» количество входных данных, он может построить выходное значение и передать его своим преемникам.

График будет выглядеть так:

Одна проблема, которую я вижу, заключается в том, что вы должны определить тип вывода файла multifunction_node. Это также статическое объявление, хотя вам может понадобиться вариант кортежа.

РЕДАКТИРОВАТЬ:

Сделаем несколько упрощающих предположений:

  • Хотя N неизвестен во время компиляции, он известен и неизменен во время выполнения. Ослабление этого ограничения потребует передачи некоторых дополнительных данных с каждым сообщением.
  • Хотя вы использовали tuple, я полагаю, что это произошло потому, что вывод join_nodes является tuple (я пытался добавить векторы в качестве особого случая, но я не думаю, что это вариант.) Я предполагаю, что все function_nodes у вас есть в вектор имеет тот же тип, что и вывод. Таким образом, мы можем избежать использования вариантного типа.
  • Данные, которые мы передаем, не особенно велики (копирование-конструкция не является сверхдорогой). Ослабление этого ограничения потребует гораздо большей осторожности при доступе к данным в каждом узле.
  • Есть что-то, что однозначно определяет сообщения, которые идут вместе. Например, если вы передаете буфер данных только для чтения каждому function_node в векторе, адрес этого буфера является частью, которая позволяет нам знать, какие сообщения объединять.

Прошло несколько лет с тех пор, как я работал над TBB, поэтому, возможно, есть некоторые вещи, о которых я не знаю, но я могу дать вам набросок.

График будет выглядеть так:

график с использованием вектора функций

(На самом деле я набрасываю структуру соединения с сопоставлением тегов, потому что похоже, что это то, что вам нужно.)

Когда вы строите вектор function_nodes, необходимо, чтобы каждый function_body знал, каков его индекс. В общем случае это означает, что вектор состоит из указателей на function_nodes, и каждый узел построен с индексом в качестве одного из его параметров.

Я предполагаю, что вывод source_node's является чем-то вроде буфера. этот буфер передается каждому function_node в векторе, и каждый function_node имеет тип вывода, который

  • адрес буфера
  • индекс function_node в этом векторе узлов
  • прочее волшебное добро

В multifuncton_node будет выполняться большая часть работы. В нем есть

  • вектор hash_maps, проиндексированный этим индексом function_node и с ключом адреса буфера, содержащий результаты для каждого function_node для различных буферов.
  • hash_map с ключом адреса буфера, содержащим количество элементов, полученных для этого буфера. Когда он достигает N, у вас есть все ваши входы.

Когда multifunction_node получает сообщение, оно

  • добавляет данные в hash_map[i][key], где i — индекс function_node (во входном сообщении), а ключ — адрес буфера
  • увеличивается hash_count[key]. Если это сейчас N, то
  • построить вектор значений результата, извлекая каждое из хеш-таблицы для этого индекса.
  • перенаправьте это значение, если вы его построили, иначе просто вернитесь.

Есть некоторые опасения по поводу того, как данные передаются и хранятся, и как очищать элементы, если вы ожидаете, что значения будут повторно использоваться, но это основной набросок.

person cahuson    schedule 21.07.2017
comment
Хорошо, мне нужно изучить, понять и закодировать это. Затем я возвращаюсь. я обновил свой пример - person Timocafé; 21.07.2017
comment
большой вверх! Большое спасибо ! - person Timocafé; 24.07.2017
comment
нп. Не забывайте, что узлы будут работать асинхронно, поэтому, если вы изменяете общие данные, вы должны сериализовать эти обращения. Вы можете использовать smart_pointers для передачи данных, но это касается только освобождения неиспользуемых данных, а не сериализации доступа. Удачи. - person cahuson; 24.07.2017
comment
Вы также можете создать вектор function_nodes с емкостью, установленной на N, а затем добавить узлы с function_body, построенными с помощью индекса. Но я всегда путаюсь, когда происходит ссылка на тело функции и когда она передается по ссылке, поэтому я остаюсь с указателями в векторе. - person cahuson; 24.07.2017
comment
Небольшое обновление после двухмесячной головной боли: мы можем создать динамический join_node, используя метод multifunction_node. Наш график стал более динамичным, что очень круто. Спасибо за поддержку - person Timocafé; 05.10.2017

Если вы знаете N во время компиляции для конкретной программы, но хотите реализовать график в общем виде для библиотеки, которая будет использоваться в разных программах, BOOST_PP — это вариант для небольшого N.

Я реализовал граф, который генерирует continue_msg после того, как самый медленный подключенный узел выводит continue_msg. Для этого мне понадобилось несколько N буферных узлов и подключить их к узлу соединения с N портами одного типа (tbb::flow::continue_msg).

По сути, приведенный ниже код делает то, что вы намеревались

for(int i(0); i < vec_node.size(); ++i)
    tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));

... но использует прекомпилятор для «записи» нескольких строк с правильными вызовами make_edge, но только до N (для N ‹ MY_JOIN_NODE_VARIADIC_MAX этот выбор произволен, чтобы ограничить его «маленьким» N):

    #include "boost/preprocessor/repetition/repeat_from_to.hpp"
    #include "boost/preprocessor/repetition/repeat.hpp"
    #include "boost/preprocessor/arithmetic/inc.hpp"

    ...

    #define MY_JOIN_NODE_VARIADIC_MAX 8
    #define MY_FUNC_IMPL(z, n, unused) tbb::flow::make_edge(vec_node[##n], tbb::flow::input_port<##n>(joinNode));
    #define MY_MAKE_IMPL(z, n, unused)                                  \
    template <size_t N, typename TJoinNode> void                        \
    makeAllEdges (TJoinNode& joinNode,                                  \
                     typename std::enable_if< N == n >::type * = 0)     \
    {                                                                   \
        BOOST_PP_REPEAT(n, MY_FUNC_IMPL, unused)                          \
    }
    BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_INC(MY_JOIN_NODE_VARIADIC_MAX), MY_MAKE_IMPL, unused)
    #undef MY_MAKE_IMPL
    #undef MY_FUNC_IMPL
    #undef MY_JOIN_NODE_VARIADIC_MAX

Этот код является определением функции. Затем можно вызвать "makeAllEdges". (Обратите внимание, что в этом примере я предполагаю, что makeAllEdges — это метод класса, а vec_node — член класса, и, как таковой, он известен в контексте makeAllEdges.)

person Veroni    schedule 15.03.2019