У меня есть такой поток: <_time(timestamp), uri(string), userId(int)>
. Атрибут _time
- это rowtime, и я регистрирую его как таблицу:
tableEnv.registerDataStream("userVisitPage", stream, "_time.rowtime, uri,userId");
Затем я запрашиваю таблицу:
final String sql =
"SELECT tumble_start(_time, interval '10' second) as timestart, " +
" count(distinct userId) as uv, " +
" uri as uri, " +
" count(1) as pv " +
"FROM userVisitPage " +
"GROUP BY tumble(_time, interval '10' second), uri";
final Table table = tableEnv.sqlQuery(sql);
Однако запрос выдает исключение:
org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE
If you think this function should be supported, you can create an issue and start a discussion for it.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1006)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:234)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:321)
at org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:44)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116)
at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:837)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:764)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:734)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:357)
Как я могу реализовать этот запрос?