Я пытаюсь передать часть работы от одного типизированного актера другому. В руководстве пользователя CAF указано, что это можно сделать методом forward_to
. Этот метод выглядит так, как будто он доступен только для акторов, которые явно относятся к типу event_based_actor
. Однако forward_to
кажется тонкой оболочкой метода forward_current_message
, который определен для всех акторов типа local_actor
. Поэтому я предполагаю, что можно напрямую вызывать forward_current_message
?
Кроме того, для того, чтобы пересылка сообщений работала с типизированными акторами, мне по-прежнему приходилось возвращать ответ от промежуточного актора. Кажется, что ответ этого актера игнорируется, и это хорошо, но я делаю что-то не так? Или действительно необходимо оплатить (обычно минимальную) стоимость создания ответа, который не будет использован?
Вот некоторый рабочий пример кода, который демонстрирует мою попытку пересылки сообщений с типизированными акторами:
#include <iostream>
#include "caf/all.hpp"
using namespace caf;
using namespace std;
using a_type = typed_actor<replies_to<int>::with<bool>>;
using b_type = typed_actor<replies_to<int>::with<bool>>;
actor worker()
{
return spawn(
[](event_based_actor *self) -> behavior
{
return
{
[self](int index)
{
aout(self) << "Worker: " << index << endl;
return index;
}
};
});
}
b_type::behavior_type bBehavior(b_type::pointer self)
{
return
{
[self](int value)
{
// Create blocking actor
scoped_actor blockingActor;
// Spawn pool workers and send each a message
auto pool = actor_pool::make(value, worker, actor_pool::round_robin());
for(int i = 0; i < value; ++i)
{
blockingActor->send(pool, i);
}
// Wait for completion
vector<int> results;
int i = 0;
blockingActor->receive_for(i, value) (
[&results](int value)
{
results.push_back(value);
});
blockingActor->send_exit(pool, exit_reason::user_shutdown);
self->quit();
return (value == results.size());
}
};
}
class A : public a_type::base
{
protected:
behavior_type make_behavior() override
{
return
{
[this](int value) -> bool
{
aout(this) << "Number of tasks: " << value << endl;
b_type forwardDestination = spawn(bBehavior);
auto castDestination = actor_cast<actor>(forwardDestination);
this->forward_current_message(castDestination);
this->quit();
return false;
}
};
}
};
void tester()
{
a_type testeeActor = spawn<A>();
scoped_actor self;
self->sync_send(testeeActor, 5).await(
[testeeActor, &self](bool success)
{
aout(self) << "All workers completed? " << (success ? "Yes!" : "No :(") << endl;
});
}
int main()
{
tester();
await_all_actors_done();
shutdown();
cout << "Press Enter to continue" << endl;
cin.get();
}