Apache Flink: как использовать DISTINCT во временном окне TUMBLE?

У меня есть такой поток: <_time(timestamp), uri(string), userId(int)>. Атрибут _time - это rowtime, и я регистрирую его как таблицу:

tableEnv.registerDataStream("userVisitPage", stream, "_time.rowtime, uri,userId");

Затем я запрашиваю таблицу:

final String sql =
       "SELECT tumble_start(_time, interval '10' second) as timestart, " +
       "  count(distinct userId) as uv, " +
       "  uri as uri, " +
       "  count(1) as pv " +
       "FROM userVisitPage " +
       "GROUP BY tumble(_time, interval '10' second), uri";

final Table table = tableEnv.sqlQuery(sql);

Однако запрос выдает исключение:

org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1006)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:234)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:321)
    at org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:44)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
    at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:837)
    at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:764)
    at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:734)
    at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
    at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:357)

Как я могу реализовать этот запрос?


person 刘嘉睿    schedule 02.03.2018    source источник
comment
Можете ли вы опубликовать полную трассировку стека исключения?   -  person Fabian Hueske    schedule 02.03.2018


Ответы (1)


Обновление: доступен Flink 1.6.0, который поддерживает агрегаты DISTINCT для потоковых таблиц.

Flink (версия 1.4.x) пока не поддерживает SQL-запросы с агрегированием DISTINCT в потоковых таблицах. Поддержка нацелена на Flink 1.6, который выйдет не раньше середины 2018 года.

Однако вы можете реализовать определяемая пользователем функция агрегирования, чтобы вычислять отдельные счетчики и использовать эту функцию в ваших запросах после их регистрации. Конечно, синтаксис запроса будет другим.

person Fabian Hueske    schedule 20.03.2018
comment
Как он использует окно, так как сбросить аккумулятор в конце времени окна? ResetAccumulator будет вызываться автоматически при мигании в конце окна? например, нам нужно сбросить аккумулятор перед следующим падением нового окна. - person YuFeng Shen; 27.08.2018
comment
Мне жаль. Я не понимаю вашего вопроса. Я бы рекомендовал отправить сообщение в список рассылки Flink, который лучше подходит для обсуждения деталей реализации. Также был выпущен Flink 1.6.0 с поддержкой агрегирования DISTINCT. - person Fabian Hueske; 27.08.2018