Hi Smile, Thanks for bringing up this discussion. I think you are right, it's an bug, MiniBatchGroupAggFunction should also do this. You can open a jira issue for this, and offer to fix it if you wish. Even more, current MiniBatchGroupAggFunction has some more severe bug, it's not cleaning expired state, see FLINK-17096.
Smile <letters_sm...@163.com> 于2020年10月12日周一 下午6:15写道: > When using SELECT DISTINCT in Flink SQL I found that when a new message > with > the same key arrives, it will emit a retract and a new insert message. > According to JIRA-FLINK-8566 > <https://issues.apache.org/jira/browse/FLINK-8566> and JIRA-FLINK-8564 > <https://issues.apache.org/jira/browse/FLINK-8564> , it's a feature to > prevent too early state eviction of downstream operators when state > cleaning > is enabled. However, when I enabled MiniBatch Aggregation > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation> > > , it stops emitting those retract and new messages. > > According to the source code of GroupAggFunction > < > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183> > > and MiniBatchGroupAggFunction > < > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206> > > , when MiniBatch Aggregation enabled, it does not emit those messages. > > But why not? I don't think MiniBatch Aggregation can avoid state eviction > of > downstream operators anyway. > > *GroupAggFunction.java:* > 1 2 3 4 5 6 7 8 910111213141516 > if (!stateCleaningEnabled && equaliser.equals(prevAggValue, > newAggValue)) { // newRow is the same as before and state cleaning is not > enabled. // We do not emit retraction and acc message. // If > state cleaning > is enabled, we have to emit messages to prevent too early // state > eviction > of downstream operators. return;} else { // retract previous > result if > (generateUpdateBefore) { // prepare UPDATE_BEFORE message > for previous row > resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER);} > > > *MiniBatchGroupAggFunction.java:* > 1 2 3 4 5 6 7 8 9101112 > if (!equaliser.equals(prevAggValue, newAggValue)) { // new row is not > same > with prev row if (generateUpdateBefore) { // prepare > UPDATE_BEFORE message > for previous row resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); out.collect(resultRow);}// > new row is same with prev row, no need to output > > > Flink Version: 1.12-SNAPSHOT( GitHub Master > <https://github.com/apache/flink> ) > Planner: Blink Planner > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > -- Best, Benchao Li