Shuai Xu created FLINK-33689:
--------------------------------

             Summary: jsonObjectAggFunction can't retract previous data which 
is invalid when enable local global agg
                 Key: FLINK-33689
                 URL: https://issues.apache.org/jira/browse/FLINK-33689
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.18.0
            Reporter: Shuai Xu

Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
    s"""
       |select
       |   JSON_OBJECTAGG(key k value v)
       |from (select
       |             cast(SUM(a) as string) as k,t as v
       |       from
       |             Table6
       |       group by t)
       |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
    List(
      "{\"30\":2}"
    )
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to