[ 
https://issues.apache.org/jira/browse/FLINK-21733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300102#comment-17300102
 ] 

Jark Wu commented on FLINK-21733:
---------------------------------

Fixed in 
 - master: c2d9e69450e0a1bf14aa866ecdd8f5c38072923d
 - release-1.12: TODO

> WatermarkAssigner incorrectly recomputing the rowtime index which may cause 
> ArrayIndexOutOfBoundsException
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21733
>                 URL: https://issues.apache.org/jira/browse/FLINK-21733
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.3, 1.12.2
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.3
>
>
> WatermarkAssigner incorrectly recomputing the rowtime index in copy method 
> which may cause ArrayIndexOutOfBoundsException in such case:
> {code}
> @Test
>   def testProjectTransposeWatermarkAssigner(): Unit = {
>     val sourceDDL =
>       s"""
>          |CREATE TEMPORARY TABLE `t1` (
>          |  `a`  VARCHAR,
>          |  `b`  VARCHAR,
>          |  `c`  VARCHAR,
>          |  `d`  INT,
>          |  `t`  TIMESTAMP(3),
>          |  `ts` AS `t`,
>          |  WATERMARK FOR `ts` AS `ts`  - INTERVAL '10' SECOND
>          |) WITH (
>          |  'connector' = 'values',
>          |  'enable-watermark-push-down' = 'true',
>          |  'bounded' = 'false',
>          |  'disable-lookup' = 'true'
>          |)
>        """.stripMargin
>     util.tableEnv.executeSql(sourceDDL)
>     val sql =
>       s"""
>          |select a, b, ts
>          |from t1
>          |""".stripMargin
>     util.verifyPlan(sql)
>   }
> {code}
> exception stack
> {code}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3
>       at 
> com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:75)
>       at org.apache.calcite.util.Util$TransformingList.get(Util.java:2732)
>       at 
> org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner.copy(WatermarkAssigner.scala:68)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.addRelToGraph(HepPlanner.java:805)
>       at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:158)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:282)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to