[ https://issues.apache.org/jira/browse/FLINK-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jinfeng updated FLINK-16451: ---------------------------- Summary: listagg with distinct for over window codegen error (was: listagg with distinct for over window ) > listagg with distinct for over window codegen error > ---------------------------------------------------- > > Key: FLINK-16451 > URL: https://issues.apache.org/jira/browse/FLINK-16451 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.9.2, 1.10.0 > Reporter: jinfeng > Priority: Major > > When I use lisgagg with distinct and over window. > {code:java} > //代码占位符 > "select listagg(distinct product, '|') over(partition by user order by > proctime rows between 200 preceding and current row) as product, user from " > + testTable > {code} > I got the follwing exception > {code:java} > //代码占位符 > Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, > Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at > java.util.ArrayList.get(ArrayList.java:433) at > java.util.Collections$UnmodifiableList.get(Collections.java:1311) at > org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at > org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635) > at > org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620) > at > org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524) > at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) > at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374) > at > org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > {code} > But It worked with > {code:java} > //代码占位符 > select listagg(distinct product) over(partition by user order by proctime > rows between 200 preceding and current row) as product, user from " + > testTable > {code} > > {code:java} > //代码占位符 > private def generateKeyExpression( > ctx: CodeGeneratorContext, > generator: ExprCodeGenerator): GeneratedExpression = { > val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess( > ctx, > generator.input1Type, > generator.input1Term, > _, > nullableInput = false, > deepCopy = inputFieldCopy)) > {code} > The exception will be throw at the below code. > The distinctInfo.argIndexs is [1, 3] . But the index 3 is a logical index. > It will be replaced by '|' . And should not generate Input Access for > index 3 -- This message was sent by Atlassian Jira (v8.3.4#803005)