Как обрабатывать DataStream с разными типами данных вместе

скажем, у меня есть функция, которая обрабатывает DataStream<X> и отправляет возврат в БД, но мне нужно читать из другого источника, и при обработке этого нового потока данных мне нужно будет найти состояния, которые я мог бы сгенерировать, прежде чем сохранять DataStream<X> в DB и найдите один идентификатор, который поступает в DataStream<Y>, а затем инициируйте действие.

У меня вопрос:

Возможно ли использование, например, Co-ProcessFunction в Flink для обработки результата преобразования в DataStream<X> и создания там состояний и в то же время обработки DataStream<Y>, чтобы состояния и новый поток находились в одном операторе?

если первый вопрос полностью неверен, что может быть возможно, есть ли как-нибудь сделать то, что мне нужно делать?

Надеюсь, кто-то поймет, что мне нужно делать.

Это графическое представление того, что мне нужно сделать. введите описание изображения здесь


person Alejandro Deulofeu    schedule 22.11.2020    source источник


Ответы (2)


Да, можно соединить два потока разных типов и обрабатывать их вместе, используя общее состояние.

Чтобы соединить Stream<X> с Stream<Y> и чтобы они совместно использовали состояние, вам нужно будет определить функции селектора ключей, которые возвращают эквивалентные ключи для обоих потоков. (Как и в SQL, где для объединения двух таблиц необходимо описать, как их можно объединить.)

В этом псевдокоде anotherFlinkFunction - это RichCoFlatMapFunction. Я предположил, что оба потока имеют поле id, которое имеет одинаковое значение, когда элементы из потока X и потока Y должны быть объединены.

x = env.addSource(...);
xTransformed = x.flatMap(...);
xTransformed.addSink(DB);

y = env.addSource(...);

z = xTransformed
  .connect(y)
  .keyBy(xt->xt.id, y->y.id)
  .flatMap(new anotherFlinkFunction());

z.addSink(...);

Соответствующие примеры вы найдете в учебных пособиях по Apache Flink по адресу https://ci.apache.org/projects/flink/flink-docs-stable/learn-flink/etl.html#example и в сопутствующем упражнении на https://github.com/apache/flink-training/tree/master/rides-and-fares.

person David Anderson    schedule 22.11.2020
comment
Да, это именно то, что я делал до сих пор, но один вопрос: можно ли выполнить соединение, не выполняя keyBy(xt->xt.id, y->y.id)? вот так: ConnectedStreams<X, Y> connectedStreams = StreamX.connect(StreamY);, а потом .flatMap(new anotherFlinkFunction());? Я сделал это, но не уверен, что все в порядке, потому что идея состоит в том, чтобы иметь значения в состояниях для Stream<X>, но Stream<Y> не всегда отправляет записи как Stream<X>, поэтому я не могу сделать keyBy, или могу? - person Alejandro Deulofeu; 23.11.2020
comment
С другой стороны, у меня есть вопрос: при этом у меня будет тип данных ConnectedStreams<X, Y>, как я могу добавить Sink к этим типам данных? потому что addSink() функция здесь не разрешена. - person Alejandro Deulofeu; 23.11.2020
comment
Да, оба потока имеют поля и id с одинаковым значением. - person Alejandro Deulofeu; 23.11.2020
comment
Я не понимаю ваш первый комментарий выше, где вы спрашиваете, что Stream<Y> не всегда отправляет записи как Stream<X>, поэтому я не могу выполнить keyBy, или могу? - person David Anderson; 23.11.2020
comment
Это был неправильный вопрос, прошу прощения за это. Вы знаете какую-нибудь ссылку, которая показывает мне, как сделать погружение в ConnectedStream? - person Alejandro Deulofeu; 23.11.2020
comment
Невозможно напрямую подключить приемник к ConnectedStream. Сначала вы должны обработать подключенный поток, используя что-то вроде RichCoFlatMap или CoProcessFunction. Если вместо этого вы просто хотите объединить два потока вместе, вы можете использовать union. - person David Anderson; 23.11.2020
comment
Да, вы были правы, я забыл достать SingleOutputStreamOperator от RichCoFlatMap. - person Alejandro Deulofeu; 23.11.2020

Полный ответ здесь:

 Strem<X> streamX = fromSource();
 Strem<Y> streamY = fromSource();

 ConnectedStreams<X, Y> connectedStreams = streamX.connect(streamY).keyBy(x-> x.id, y-> y.id);

 /*JoinStreamsFunction will receive X and Y and get Z as output*/
 SingleOutputStreamOperator<Z> out = connectedStreams.flatMap(new JoinStreamsFunction());
        out.addSink(new SinkFunctionHere());
person Alejandro Deulofeu    schedule 23.11.2020