Как сделать Intel TBB multifunction_node с динамическим количеством портов?

Я новичок в библиотеке Intel TBB. Как видите, мой вопрос связан с tbb::flow::graph. Мне нужно реализовать логику, например:

Пользователь рисует график с некоторыми логическими блоками. Каждый блок (узел) может иметь неограниченное количество соединений (ребер), поэтому каждый блок (узел) может выбирать, куда поместить данные дальше. Затем моя программа с помощью библиотеки TBB построит такой график и проведет расчеты.

Поэтому я не знаю, возможно ли создать узел (думаю, это должен быть multifunction_node) с динамическим количеством выходных портов. Не могли бы вы показать мне, как это сделать, пожалуйста?


person Max Pashkov    schedule 27.10.2015    source источник


Ответы (1)


К сожалению, нет способа (без динамической компиляции) изменить количество выходных портов в multifunction_node. Вы можете создать максимальное количество портов (которое контролируется макропереключателем и зависит от компилятора) и просто динамически подключаться к портам. Если вы выполняете команду try_put для порта, а преемник не подключен, попытка try_put завершается ошибкой, и вы можете отреагировать на это во время выполнения.

Другой способ сделать это (хотя и с некоторым разочарованием, как мне кажется) — построить бинарное дерево из двухпортовых multifunction_nodes. Если вы используете класс с назначением вывода в качестве поля, сконструируйте каждый узел так, чтобы он реагировал на один бит адресата и выводился на порт 0 или порт 1, в зависимости от результата маски. короткое замыкание планировщика будет относительно быстро направлять вывод по дереву, но вы заплатите немного штрафа за множественные динамические вызовы.

Или вы можете использовать какое-то другое основание, кроме 2 (например, 10).

Дополнение: после разговора с Майком (разработчиком flow::graph) мы поняли, что есть еще один способ справиться с этим, который позволяет использовать динамическое количество портов. Вам придется сделать немного низкоуровневых вещей, но это выглядит так:

#include "tbb/tbb.h"
#include <iostream>

using namespace tbb::flow;

tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
    output_port_vector_t &my_ports;
    public:
    multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
    multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
    continue_msg operator()(const int in) {
        int current_size = my_ports.size();
        if(in >= current_size) {
            // error condition?  grow concurrent_vector?
            tbb::spin_mutex::scoped_lock gl(io_lock);
            std::cout << "Received input out of range(" << in << ")" << std::endl;
        }
        else {
            // do computation
            my_ports[in]->try_put(in*2);
        }
        return continue_msg();
    }
};

struct output_function_body {
    int my_prefix;
    output_function_body(int i) : my_prefix(i) { }
    int operator()(const int i) {
        tbb::spin_mutex::scoped_lock gl(io_lock);
        std::cout << " output node "<< my_prefix << " received " << i << std::endl;
        return i;
    }
};

int main() {
    graph g;
    output_port_vector_t output_ports;
    function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
    // create broadcast_nodes
    for( int i = 0; i < 20; ++i) {
        bnode_element_t *bp = new bnode_element_t(g);
        output_ports.push_back(bp);
    }

    // attach the output nodes to the broadcast_nodes
    for(int i = 0; i < 20; ++i) {
        function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
        make_edge(*(output_ports[i]),*fp);
    }

    for( int i = 0; i < 21; ++i) {
        my_node.try_put(i);
    }
    g.wait_for_all();
    return 0;
}

Примечания к вышеизложенному:

  • Мы создаем concurrent_vector указателей на broadcast_nodes. Преемники function_node присоединяются к этим broadcast_nodes. Вывод function_node игнорируется.
  • Concurrent_vector передается конструктору multioutput_function_body. В этом случае нам вообще не нужен multifunction_node. multioutput_function_body решает, от broadcast_node до try_put во время выполнения. Обратите внимание, мы делаем явное try_puts для broadcast_nodes. Это приводит к созданию задачи для каждого try_put. Порожденные задачи выполняются быстрее, чем задачи, поставленные в очередь, но накладные расходы на планирование выше, чем просто возврат значения из узла.
  • Я не добавлял очистку выделенного в куче broadcast_nodes и вывода function_nodes. «Очевидным» местом для удаления broadcast_nodes будет деструктор multioutput_function_body. Вы не должны этого делать, так как создание function_node приводит к созданию копии переданных тел функций, а несколько копий function_body будут иметь ссылку на concurrent_vector broadcast_node указателей. Сделайте удаление после g.wait_for_all().

Я использовал concurrent_vector, потому что он разрешает доступ к указателям во время изменения concurrent_vector. Вопрос о том, можно ли добавлять дополнительные broadcast_node указателя во время выполнения графа, остается открытым. Надеюсь, вы только создаете узлы и используете их как есть, а не модифицируете их на лету. concurrent_vectors не перераспределять и не перемещать уже инициализированные элементы при наращивании структуры; именно поэтому я использовал его, но не думайте, что это полный ответ, если вы надеетесь добавить дополнительные узлы во время работы графа.

person cahuson    schedule 28.10.2015
comment
Блог Антона обсуждает проблемы с параллельным ростом во время доступа к concurrent_vector. Простой способ добавить элементы — использовать zero_allocator для concurrent_vector и push_back указатели на широковещательные узлы после их создания. - person cahuson; 28.10.2015
comment
Большое спасибо за Ваш ответ! К сожалению, я хочу изменить график на лету. Поэтому я реализую какое-то дерево с multifunction_nodes (может быть бинарное дерево, как вы предлагаете). В любом случае это будет небольшая головная боль)) - person Max Pashkov; 29.10.2015
comment
@Max, подумав об этом, я считаю, что вы можете одновременно увеличивать concurrent_vector во время работы графика. У вас будет недетерминизм во время выполнения, но если вы можете жить с этим и не уничтожаете узлы во время выполнения графа, все будет в порядке. - person cahuson; 29.10.2015
comment
Еще одно различие между решением multifunction_node и этим заключается в том, что широковещательные узлы всегда принимают входные данные, поэтому вы не можете использовать возвращаемое значение из try_put, чтобы определить, подключен ли другой узел к широковещательному_узлу. - person cahuson; 29.10.2015
comment
да я вижу. Решение multifunction_node кажется более гибким, но более сложным. - person Max Pashkov; 29.10.2015