[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shengkai Fang reassigned FLINK-33489: ------------------------------------- Assignee: xuyang > LISTAGG with generating partial-final agg will cause wrong result > ----------------------------------------------------------------- > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 > Reporter: xuyang > Assignee: xuyang > Priority: Major > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)