Hi, the exception says: "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.".
The problem is that your query first joins the two tables without a temporal condition and then wants to do a windowed grouping. Joins without temporal condition are not able to preserve the rowtime attribute. You should try to change the join into a time-windowed join [1] [2] by adding a BETWEEN predicate on the rowtime attributes of both tables. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins Am Mi., 23. Okt. 2019 um 09:18 Uhr schrieb Manoj Kumar <man...@bigzetta.com >: > > *Hi All,* > > *[Explanation]* > > Two tables say lineitem and orders: > > Table > orderstbl=bsTableEnv.fromDataStream(orders,"a,b,c,d,e,f,g,h,i,orders.rowtime"); > Table > lineitemtbl=bsTableEnv.fromDataStream(lineitem,"a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,lineitem.rowtime"); > > bsTableEnv.registerTable("Orders",orderstbl); > bsTableEnv.registerTable("Lineitem",lineitemtbl) > > *#Rgular tumble window works* > Table sqlResult = bsTableEnv.sqlQuery("Select count(Orders.a) FROM Orders > GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)"); > Table sqlResult = bsTableEnv.sqlQuery("Select count(Lineitem.a) FROM > Lineitem GROUP BY TUMBLE(lineitem, INTERVAL '5' SECOND)"); > > *#Datastream TumblingEventTimeWindows joins also works fine* > > lineitem.join(orders).where(...).equalTo(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(...) > > *But when I try to join them over same window it gives me error, it might > possible I am writing wrong SQL :(* > > Table sqlResult = bsTableEnv.sqlQuery("SELECT count(Lineitem.a) FROM " > + "Orders,Lineitem where Lineitem.a=Orders.a " > + "GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)"); > > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query: > > FlinkLogicalSink(name=[sink], fields=[b]) > +- FlinkLogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)], > window=[TumblingGroupWindow], properties=[]) > +- FlinkLogicalCalc(select=[orders, a0]) > +- FlinkLogicalJoin(condition=[=($2, $0)], joinType=[inner]) > :- FlinkLogicalCalc(select=[a, orders]) > : +- > FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_3]]) > +- FlinkLogicalCalc(select=[a]) > +- > FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_6]]) > > Rowtime attributes must not be in the input rows of a regular join. As a > workaround you can cast the time attributes of input tables to TIMESTAMP > before. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:147) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at bzflink.StreamingTable.main(StreamingTable.java:65) > Caused by: org.apache.flink.table.api.TableException: Rowtime attributes > must not be in the input rows of a regular join. As a workaround you can > cast the time attributes of input tables to TIMESTAMP before. > at > org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1522) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1795) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:325) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > ... 20 more > > Process finished with exit code 1 > > -- > Regards, > Manoj Kumar >