Smile created FLINK-19592: ----------------------------- Summary: MiniBatchGroupAggFunction should emit messages to prevent too early state eviction of downstream operators Key: FLINK-19592 URL: https://issues.apache.org/jira/browse/FLINK-19592 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.12.0 Reporter: Smile
Currently, [GroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183] will emit a retract and a new insert message when a new message with the same key arrives. According to [Flink-8566|https://issues.apache.org/jira/browse/FLINK-8566], it's a feature to prevent too early state eviction of downstream operators. However, [MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206] doesn't. Before [Flink-8566|https://issues.apache.org/jira/browse/FLINK-8566] being resolved, it should also emit these messages. *GroupAggFunction.java:* {code:java} if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { // newRow is the same as before and state cleaning is not enabled. // We do not emit retraction and acc message. // If state cleaning is enabled, we have to emit messages to prevent too early // state eviction of downstream operators. return; } else { // retract previous result if (generateUpdateBefore) { // prepare UPDATE_BEFORE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); out.collect(resultRow); } // prepare UPDATE_AFTER message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER); } {code} *MiniBatchGroupAggFunction.java:* {code:java} if (!equaliser.equals(prevAggValue, newAggValue)) { // new row is not same with prev row if (generateUpdateBefore) { // prepare UPDATE_BEFORE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); out.collect(resultRow); } // prepare UPDATE_AFTER message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER); out.collect(resultRow); } // new row is same with prev row, no need to output {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)