[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784273#comment-17784273 ]
xuyang commented on FLINK-33489: -------------------------------- For a quick fix, we can forbid generating final-partial agg with this function. For a long term, we can only use distinct as the partial agg and then use the real agg function as the final agg. But first we must check out other agg functions. I'll try to fix it. > LISTAGG with generating partial-final agg will case wrong result > ---------------------------------------------------------------- > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 > Reporter: xuyang > Priority: Major > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)