К сожалению, нет способа (без динамической компиляции) изменить количество выходных портов в multifunction_node. Вы можете создать максимальное количество портов (которое контролируется макропереключателем и зависит от компилятора) и просто динамически подключаться к портам. Если вы выполняете команду try_put для порта, а преемник не подключен, попытка try_put завершается ошибкой, и вы можете отреагировать на это во время выполнения.
Другой способ сделать это (хотя и с некоторым разочарованием, как мне кажется) — построить бинарное дерево из двухпортовых multifunction_nodes. Если вы используете класс с назначением вывода в качестве поля, сконструируйте каждый узел так, чтобы он реагировал на один бит адресата и выводился на порт 0 или порт 1, в зависимости от результата маски. короткое замыкание планировщика будет относительно быстро направлять вывод по дереву, но вы заплатите немного штрафа за множественные динамические вызовы.
Или вы можете использовать какое-то другое основание, кроме 2 (например, 10).
Дополнение: после разговора с Майком (разработчиком flow::graph) мы поняли, что есть еще один способ справиться с этим, который позволяет использовать динамическое количество портов. Вам придется сделать немного низкоуровневых вещей, но это выглядит так:
#include "tbb/tbb.h"
#include <iostream>
using namespace tbb::flow;
tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
output_port_vector_t &my_ports;
public:
multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
continue_msg operator()(const int in) {
int current_size = my_ports.size();
if(in >= current_size) {
// error condition? grow concurrent_vector?
tbb::spin_mutex::scoped_lock gl(io_lock);
std::cout << "Received input out of range(" << in << ")" << std::endl;
}
else {
// do computation
my_ports[in]->try_put(in*2);
}
return continue_msg();
}
};
struct output_function_body {
int my_prefix;
output_function_body(int i) : my_prefix(i) { }
int operator()(const int i) {
tbb::spin_mutex::scoped_lock gl(io_lock);
std::cout << " output node "<< my_prefix << " received " << i << std::endl;
return i;
}
};
int main() {
graph g;
output_port_vector_t output_ports;
function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
// create broadcast_nodes
for( int i = 0; i < 20; ++i) {
bnode_element_t *bp = new bnode_element_t(g);
output_ports.push_back(bp);
}
// attach the output nodes to the broadcast_nodes
for(int i = 0; i < 20; ++i) {
function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
make_edge(*(output_ports[i]),*fp);
}
for( int i = 0; i < 21; ++i) {
my_node.try_put(i);
}
g.wait_for_all();
return 0;
}
Примечания к вышеизложенному:
- Мы создаем
concurrent_vector
указателей на broadcast_nodes
. Преемники function_node
присоединяются к этим broadcast_nodes
. Вывод function_node
игнорируется.
- Concurrent_vector передается конструктору
multioutput_function_body
. В этом случае нам вообще не нужен multifunction_node. multioutput_function_body
решает, от broadcast_node
до try_put
во время выполнения. Обратите внимание, мы делаем явное try_puts
для broadcast_nodes
. Это приводит к созданию задачи для каждого try_put
. Порожденные задачи выполняются быстрее, чем задачи, поставленные в очередь, но накладные расходы на планирование выше, чем просто возврат значения из узла.
- Я не добавлял очистку выделенного в куче
broadcast_nodes
и вывода function_nodes
. «Очевидным» местом для удаления broadcast_nodes
будет деструктор multioutput_function_body
. Вы не должны этого делать, так как создание function_node
приводит к созданию копии переданных тел функций, а несколько копий function_body
будут иметь ссылку на concurrent_vector broadcast_node
указателей. Сделайте удаление после g.wait_for_all()
.
Я использовал concurrent_vector
, потому что он разрешает доступ к указателям во время изменения concurrent_vector
. Вопрос о том, можно ли добавлять дополнительные broadcast_node
указателя во время выполнения графа, остается открытым. Надеюсь, вы только создаете узлы и используете их как есть, а не модифицируете их на лету. concurrent_vectors
не перераспределять и не перемещать уже инициализированные элементы при наращивании структуры; именно поэтому я использовал его, но не думайте, что это полный ответ, если вы надеетесь добавить дополнительные узлы во время работы графа.
person
cahuson
schedule
28.10.2015