xuyang created FLINK-33760: ------------------------------ Summary: Group Window agg has different result when only consuming -D records while using or not using minibatch Key: FLINK-33760 URL: https://issues.apache.org/jira/browse/FLINK-33760 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: xuyang
Add the test in AggregateITCase to re-produce this bug. {code:java} @Test def test(): Unit = { val upsertSourceCurrencyData = List( changelogRow("-D", 1.bigDecimal, "a"), changelogRow("-D", 1.bigDecimal, "b"), changelogRow("-D", 1.bigDecimal, "b") ) val upsertSourceDataId = registerData(upsertSourceCurrencyData); tEnv.executeSql(s""" |CREATE TABLE T ( | `a` DECIMAL(32, 8), | `d` STRING, | proctime as proctime() |) WITH ( | 'connector' = 'values', | 'data-id' = '$upsertSourceDataId', | 'changelog-mode' = 'I,UA,UB,D', | 'failing-source' = 'true' |) |""".stripMargin) val sql = "SELECT max(a), sum(a), min(a), TUMBLE_START(proctime, INTERVAL '0.005' SECOND), TUMBLE_END(proctime, INTERVAL '0.005' SECOND), d FROM T GROUP BY d, TUMBLE(proctime, INTERVAL '0.005' SECOND)" val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink) env.execute() // Use the result precision/scale calculated for sum and don't override with the one calculated // for plus()/minus(), which results in loosing a decimal digit. val expected = List("6.41671935,65947.23071935707000000000,609.02867403703699700000") assertEquals(expected, sink.getRetractResults.sorted) } {code} When MiniBatch is ON, the result is `List()`. When MiniBatch is OFF, the result is `List(null,-1.00000000,null,2023-12-06T11:29:21.895,2023-12-06T11:29:21.900,a)`. -- This message was sent by Atlassian Jira (v8.20.10#820010)