[ https://issues.apache.org/jira/browse/FLINK-21962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Gao updated FLINK-21962: ---------------------------- Fix Version/s: 1.16.0 > SQL Group Windows do not work on Flink 1.11 > -------------------------------------------- > > Key: FLINK-21962 > URL: https://issues.apache.org/jira/browse/FLINK-21962 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.11.3 > Reporter: Barak Ben-Nathan > Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > Fix For: 1.15.0, 1.16.0 > > > I am running this on Flink ver.1.11.3: > > {code:java} > val bsEnv: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val bsSettings: EnvironmentSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv: StreamTableEnvironment = > StreamTableEnvironment.create(bsEnv, bsSettings) > val cat = new GenericInMemoryCatalog("custom_catalog", "customDB") > bsTableEnv.registerCatalog("custom_catalog", cat) > bsTableEnv.useCatalog("custom_catalog") > bsTableEnv.useDatabase("customDB") > val createInputTableDDL = > """ > | > |CREATE TABLE kafkaTable ( > | user_id BIGINT, > | item_id BIGINT, > | category_id BIGINT, > | behavior STRING, > | timestamp1 TIMESTAMP(3), > | WATERMARK FOR timestamp1 AS timestamp1 > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'test-in', > | 'properties.bootstrap.servers' = 'localhost:9092', > | 'properties.group.id' = 'testGroup', > | 'format' = 'json', > | 'scan.startup.mode' = 'earliest-offset' > |) > |""".stripMargin > val createOutputTableDDL = > """ > |CREATE TABLE kafkaOutTable ( > | strt TIMESTAMP(3), > | sum_ijh BIGINT > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'test-out', > | 'properties.bootstrap.servers' = 'localhost:9092', > | 'properties.group.id' = 'testGroup', > | 'format' = 'json', > | 'scan.startup.mode' = 'earliest-offset' > |) > |""".stripMargin > bsTableEnv.executeSql(createInputTableDDL) > bsTableEnv.executeSql(createOutputTableDDL) > val result = bsTableEnv.sqlQuery( > "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' > MINUTE) as strt, sum(item_id) as sum_ijh FROM kafkaTable " + > "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)" > ) > result.executeInsert("kafkaOutTable") > {code} > When setting log level to TRACE, the job fails with this exception: > {code:java} > Exception in thread "main" java.lang.ClassCastException: > org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to > org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread > "main" java.lang.ClassCastException: > org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to > org.apache.flink.table.planner.plan.cost.FlinkCostFactory at > org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41) > at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown > Source) at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown > Source) at > org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown > Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown > Source) at > GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.Iterator.foreach(Iterator.scala:943) at > scala.collection.Iterator.foreach$(Iterator.scala:943) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown > Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown > Source) at > GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.Iterator.foreach(Iterator.scala:943) at > scala.collection.Iterator.foreach$(Iterator.scala:943) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown > Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown > Source) at > GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.Iterator.foreach(Iterator.scala:943) at > scala.collection.Iterator.foreach$(Iterator.scala:943) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40) > at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown > Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown > Source) at > org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) > at > org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249) > at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at > org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.Iterator.foreach(Iterator.scala:943) at > scala.collection.Iterator.foreach$(Iterator.scala:943) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.immutable.Range.foreach(Range.scala:158) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at > scala.collection.Iterator.foreach(Iterator.scala:943) at > scala.collection.Iterator.foreach$(Iterator.scala:943) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > at > org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) > at > org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code} > It might be related to this: > https://issues.apache.org/jira/browse/FLINK-15333 > -- This message was sent by Atlassian Jira (v8.20.1#820001)