Flink statefun совмещенные функции коммуникации

У меня есть правильно работающая встроенная работа, и я хочу развернуть дополнительные совместные рабочие места. Эти недавно добавленные вакансии будут получать сообщения от старой работы и отправлять их в тему kafka.

код, как показано ниже

@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    FunctionProvider provider = new FunctionProvider();
    binder.bindFunctionProvider( CoLocated.TYPE,provider );

    binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);
  }
}

Я получаю сообщение об ошибке, как показано ниже

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
    at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
    at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
    at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

ошибка довольно очевидна, я хочу, чтобы я определил вход.

В ссылке есть аналогично определенный встроенный модуль -> https://ci.apache.org/projects/flink/flink-statefun-docs-stable/sdk/modules.html#embedded-module

Новые определенные модули будут получать сообщения от другого модуля и отправлять их в kafka.

  • Должен ли я определять входящий трафик для каждой совместной работы? Если нет, как я могу заставить это работать?
  • Как я могу найти совместную работу для общения? Достаточно ли использовать один и тот же FunctionType?
  • Обмениваются ли совмещенные функции через вход / выход?

person Arif Ezberci    schedule 03.05.2020    source источник


Ответы (1)


Ответы встроены, и, к вашему сведению, ничего из того, что вы спрашиваете, не совпадает с конкретным. Эти свойства сохраняются для удаленных модулей и заданий, которые содержат смешанные рабочие нагрузки, совместно размещенные и удаленные.

Должен ли я определять входящий трафик для каждой совместной работы? Если нет, как я могу заставить это работать?

Да, каждое задание (удаленное или размещенное в одном месте) требует как минимум одного входа. Вход - это канал, по которому сообщения из внешнего мира поступают в приложение statefun. Подумайте о Кафке или Кинезисе. Без входа задание никогда бы ничего не сделало, потому что не было бы начальных сообщений для начала обработки.

К каждому входу вы привяжете 1 или несколько маршрутизаторов, которые принимают каждое сообщение из входа и перенаправляют их 0 или более функциям в зависимости от их типов функций [1].

Как я могу найти совместную работу для общения? Достаточно ли использовать один и тот же FunctionType?

Да, функции просто отправляют друг другу сообщения, используя свои типы функций.

Обмениваются ли совмещенные функции через вход / выход?

Нет, сообщения передаются между функциями с использованием среды выполнения Apache Flink, которая содержит высоко оптимизированный сетевой стек. Как только сообщение извлекается из входящего потока, оно больше никогда не взаимодействует с входящим потоком. Если интересно, вы можете прочитать о том, как работает сетевой стек Flink, в некоторых сообщениях в блогах, написанных сообществом, но это не обязательно для успешного использования statefun в продакшене [2].

[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/index.html#router

[2] https://flink.apache.org/2019/06/05/flink-network-stack.html

person Seth    schedule 04.05.2020