[ 
https://issues.apache.org/jira/browse/FLINK-22781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351684#comment-17351684
 ] 

Andy commented on FLINK-22781:
------------------------------

[~godfreyhe] Thanks for point out the drawback.

After I look deep into the problem, I found the output of. 
`StreamExecChangelogNormalize` is changed when enable miniBatch.

When disable minibatch, the output of `StreamExecChangelogNormalize` for key 
'Euro' is :

changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
changelogRow("-U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)) 

when enable minibatch, the output of `StreamExecChangelogNormalize` for key 
'Euro' is :
changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L))
the following window aggregate would send out a 
'Euro,0,1970-01-01T00:00,1970-01-01T00:00:05' when disable  minibatch while the 
record would not appear when enbale minibatch.
 
*In conclude, the problem is caused by two reasons:*
*1. When enable minibatch, emit behavior is different for 
`StreamExecChangelogNormalize`*
*2. WindowOperator should not emit data if the counter becomes to 0*
 
*because WindowOperator already stores the counter for per key, we could do a 
simple fix without destroy state compatibility.*
 
It's my pleasure to fix the bug, please assign the issue to me.

> Incorrect result for group window aggregate when mini-batch is enabled
> ----------------------------------------------------------------------
>
>                 Key: FLINK-22781
>                 URL: https://issues.apache.org/jira/browse/FLINK-22781
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.0
>            Reporter: godfrey he
>            Priority: Critical
>
> We can reproduce this issue through adding the following code to 
> {{GroupWindowITCase#testWindowAggregateOnUpsertSource}} method:
> {code:java}
>     tEnv.getConfig.getConfiguration.setBoolean(
>       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
>     tEnv.getConfig.getConfiguration.set(
>       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(1))
>     tEnv.getConfig.getConfiguration.setLong(
>       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 10L)
> {code}
> The reason is the group window without any data (the data may be retracted) 
> should not send any record.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to