[ https://issues.apache.org/jira/browse/FLINK-22781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351684#comment-17351684 ]
Andy commented on FLINK-22781: ------------------------------ [~godfreyhe] Thanks for point out the drawback. After I look deep into the problem, I found the output of. `StreamExecChangelogNormalize` is changed when enable miniBatch. When disable minibatch, the output of `StreamExecChangelogNormalize` for key 'Euro' is : changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)), changelogRow("-U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)), changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)) when enable minibatch, the output of `StreamExecChangelogNormalize` for key 'Euro' is : changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)) the following window aggregate would send out a 'Euro,0,1970-01-01T00:00,1970-01-01T00:00:05' when disable minibatch while the record would not appear when enbale minibatch. *In conclude, the problem is caused by two reasons:* *1. When enable minibatch, emit behavior is different for `StreamExecChangelogNormalize`* *2. WindowOperator should not emit data if the counter becomes to 0* *because WindowOperator already stores the counter for per key, we could do a simple fix without destroy state compatibility.* It's my pleasure to fix the bug, please assign the issue to me. > Incorrect result for group window aggregate when mini-batch is enabled > ---------------------------------------------------------------------- > > Key: FLINK-22781 > URL: https://issues.apache.org/jira/browse/FLINK-22781 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.14.0 > Reporter: godfrey he > Priority: Critical > > We can reproduce this issue through adding the following code to > {{GroupWindowITCase#testWindowAggregateOnUpsertSource}} method: > {code:java} > tEnv.getConfig.getConfiguration.setBoolean( > ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) > tEnv.getConfig.getConfiguration.set( > ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(1)) > tEnv.getConfig.getConfiguration.setLong( > ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 10L) > {code} > The reason is the group window without any data (the data may be retracted) > should not send any record. -- This message was sent by Atlassian Jira (v8.3.4#803005)