Shuai Xu created FLINK-33689: -------------------------------- Summary: jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg Key: FLINK-33689 URL: https://issues.apache.org/jira/browse/FLINK-33689 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.18.0 Reporter: Shuai Xu
Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} //代码占位符 def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} //代码占位符 List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)