FLINK - как обрабатывать логику результата sql-запроса

Мое требование - обработать или построить некоторую логику вокруг результата запроса sql во flink. Для простоты скажем, что у меня есть два sql-запроса, которые они выполняют с разным размером окна и один поток событий. У меня вопрос

  • а) как я узнаю, для какого результата запроса это
  • б) как я узнаю, сколько строк является результатом выполненного запроса? Мне нужна эта информация, так как мне нужно создать уведомление со списком событий, которые являются частью результата запроса.
DataStream<Event> ds = ...        
String query = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";

        String query1 = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
        List<String> list = new ArrayList<>();
        list.add(query);
        list.add(query1);
       
        tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());


        for(int i =0; i< list.size(); i++ ){
            Table result = tabEnv.sqlQuery(list.get(i));
            DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
            dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {

            List<Row> listRow = new ArrayList<>();
            @Override
            public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
                listRow.add(booleanRowTuple2.f1);
            }
            });
        }

Ценю вашу помощь. спасибо Ашутош


person Ashutosh    schedule 04.08.2020    source источник


Ответы (1)


Чтобы отсортировать результаты по какому запросу, вы можете включить идентификатор для каждого запроса в сами запросы, например,

SELECT '10sec', id, key FROM eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key

Определить количество строк в таблице результатов сложнее. Одна из проблем заключается в том, что нет окончательного ответа на количество результатов потокового запроса. Но там, где вы обрабатываете результаты, кажется, что вы можете подсчитать количество строк.

Или, и я не пробовал этого, но, возможно, вы могли бы использовать что-то вроде row_number() over(order by tumble_rowtime(rowTime, interval '10' second)), чтобы аннотировать каждую строку результата с помощью счетчика.

person David Anderson    schedule 05.08.2020
comment
Спасибо @David, попробую это решение. поскольку результат находится в потоке, и даже если я добавлю идентификатор в запрос, все равно трудно сказать, что результат запроса выполнен, и выполнить операцию с множеством этого результата. должен ли я подождать несколько секунд, а затем предположить, что все строки поступили в поток по идентификатору и готовы к обработке. пожалуйста предложите - person Ashutosh; 06.08.2020
comment
Если вход ограничен, вы можете использовать пакетную, а не потоковую среду выполнения, что упростит понимание того, что результаты завершены. - person David Anderson; 06.08.2020
comment
добавление идентификатора «10сек», как предлагается в возможном решении, не решит проблему, если результат запроса будет получен последовательно. для меня события ввода неограничены, что проблема и застряли, чтобы получить полноту результата запроса :( - person Ashutosh; 13.09.2020
comment
@Ashutosh Я не уверен, что вы имеете в виду под полнотой. Водяные знаки могут использоваться с отметками времени событий, чтобы определить, когда каждое окно завершено. Но если входной поток неограничен, то потоку результатов нет конца. Каждое окно будет давать только один результат, но одно окно будет одно за другим навсегда, если ввод не прекратится. - person David Anderson; 14.09.2020