[ https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Feng Jin updated FLINK-33936: ----------------------------- Summary: Outputting Identical Results in Mini-Batch Aggregation with Set TTL (was: The aggregation of mini-batches should output the result even if the result is the same as before when TTL is configured.) > Outputting Identical Results in Mini-Batch Aggregation with Set TTL > ------------------------------------------------------------------- > > Key: FLINK-33936 > URL: https://issues.apache.org/jira/browse/FLINK-33936 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.18.0 > Reporter: Feng Jin > Priority: Major > > If mini-batch is enabled currently, and if the aggregated result is the same > as the previous output, this current aggregation result will not be sent > downstream. This will cause downstream nodes to not receive updated data. If > there is a TTL set for states at this time, the TTL of downstream will not be > updated either. > The specific logic is as follows. > https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224 > {code:java} > if (!equaliser.equals(prevAggValue, newAggValue)) { > // new row is not same with prev row > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow > .replace(currentKey, prevAggValue) > .setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > out.collect(resultRow); > } > // new row is same with prev row, no need to output > {code} > When mini-batch is not enabled, even if the aggregation result of this time > is the same as last time, new results will still be sent if TTL is set. > https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170 > {code:java} > if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, > newAggValue)) { > // newRow is the same as before and state cleaning is not > enabled. > // We do not emit retraction and acc message. > // If state cleaning is enabled, we have to emit messages > to prevent too early > // state eviction of downstream operators. > return; > } else { > // retract previous result > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow > .replace(currentKey, prevAggValue) > .setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > } > {code} > Therefore, based on the consideration of TTL scenarios, I believe that when > mini-batch aggregation is enabled, new results should also output when the > aggregated result is the same as the previous one. -- This message was sent by Atlassian Jira (v8.20.10#820010)