Hi,

the exception says: "Rowtime attributes must not be in the input rows of a
regular join. As a workaround you can cast the time attributes of input
tables to TIMESTAMP before.".

The problem is that your query first joins the two tables without a
temporal condition and then wants to do a windowed grouping.
Joins without temporal condition are not able to preserve the rowtime
attribute.
You should try to change the join into a time-windowed join [1] [2] by
adding a BETWEEN predicate on the rowtime attributes of both tables.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins

Am Mi., 23. Okt. 2019 um 09:18 Uhr schrieb Manoj Kumar <man...@bigzetta.com
>:

>
> *Hi All,*
>
> *[Explanation]*
>
> Two tables say lineitem and orders:
>
> Table
> orderstbl=bsTableEnv.fromDataStream(orders,"a,b,c,d,e,f,g,h,i,orders.rowtime");
> Table
> lineitemtbl=bsTableEnv.fromDataStream(lineitem,"a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,lineitem.rowtime");
>
> bsTableEnv.registerTable("Orders",orderstbl);
> bsTableEnv.registerTable("Lineitem",lineitemtbl)
>
> *#Rgular tumble window works*
>  Table sqlResult = bsTableEnv.sqlQuery("Select count(Orders.a) FROM Orders
> GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
>  Table sqlResult = bsTableEnv.sqlQuery("Select count(Lineitem.a) FROM
> Lineitem GROUP BY TUMBLE(lineitem, INTERVAL '5' SECOND)");
>
> *#Datastream TumblingEventTimeWindows joins also works fine*
>
> lineitem.join(orders).where(...).equalTo(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(...)
>
> *But when I try to join them over same window it gives me error, it might
> possible I am writing wrong SQL :(*
>
> Table sqlResult  = bsTableEnv.sqlQuery("SELECT    count(Lineitem.a) FROM "
>         + "Orders,Lineitem where Lineitem.a=Orders.a "
>         + "GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
> FlinkLogicalSink(name=[sink], fields=[b])
> +- FlinkLogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
> window=[TumblingGroupWindow], properties=[])
>    +- FlinkLogicalCalc(select=[orders, a0])
>       +- FlinkLogicalJoin(condition=[=($2, $0)], joinType=[inner])
>          :- FlinkLogicalCalc(select=[a, orders])
>          :  +-
> FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_3]])
>          +- FlinkLogicalCalc(select=[a])
>             +-
> FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_6]])
>
> Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:147)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
> at
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
> at bzflink.StreamingTable.main(StreamingTable.java:65)
> Caused by: org.apache.flink.table.api.TableException: Rowtime attributes
> must not be in the input rows of a regular join. As a workaround you can
> cast the time attributes of input tables to TIMESTAMP before.
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecJoinRule.matches(StreamExecJoinRule.scala:88)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:367)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1522)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1795)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1656)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:325)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> ... 20 more
>
> Process finished with exit code 1
>
> --
> Regards,
> Manoj  Kumar
>

Reply via email to