У меня есть правильно работающая встроенная работа, и я хочу развернуть дополнительные совместные рабочие места. Эти недавно добавленные вакансии будут получать сообщения от старой работы и отправлять их в тему kafka.
код, как показано ниже
@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
FunctionProvider provider = new FunctionProvider();
binder.bindFunctionProvider( CoLocated.TYPE,provider );
binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);
}
}
Я получаю сообщение об ошибке, как показано ниже
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
ошибка довольно очевидна, я хочу, чтобы я определил вход.
В ссылке есть аналогично определенный встроенный модуль -> https://ci.apache.org/projects/flink/flink-statefun-docs-stable/sdk/modules.html#embedded-module
Новые определенные модули будут получать сообщения от другого модуля и отправлять их в kafka.
- Должен ли я определять входящий трафик для каждой совместной работы? Если нет, как я могу заставить это работать?
- Как я могу найти совместную работу для общения? Достаточно ли использовать один и тот же FunctionType?
- Обмениваются ли совмещенные функции через вход / выход?