[ 
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-33936:
-----------------------------
    Description: 
If mini-batch is enabled currently, and if the aggregated result is the same as 
the previous output, this current aggregation result will not be sent 
downstream.  This will cause downstream nodes to not receive updated data. If 
there is a TTL set for states at this time, the TTL of downstream will not be 
updated either.

The specific logic is as follows.

https://github.com/hackergin/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224

{code:java}
                    if (!equaliser.equals(prevAggValue, newAggValue)) {
                        // new row is not same with prev row
                        if (generateUpdateBefore) {
                            // prepare UPDATE_BEFORE message for previous row
                            resultRow
                                    .replace(currentKey, prevAggValue)
                                    .setRowKind(RowKind.UPDATE_BEFORE);
                            out.collect(resultRow);
                        }
                        // prepare UPDATE_AFTER message for new row
                        resultRow.replace(currentKey, 
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                        out.collect(resultRow);
                    }
                    // new row is same with prev row, no need to output
{code}



When mini-batch is not enabled, even if the aggregation result of this time is 
the same as last time, new results will still be sent if TTL is set.

https://github.com/hackergin/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170

{code:java}

                if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
newAggValue)) {
                    // newRow is the same as before and state cleaning is not 
enabled.
                    // We do not emit retraction and acc message.
                    // If state cleaning is enabled, we have to emit messages 
to prevent too early
                    // state eviction of downstream operators.
                    return;
                } else {
                    // retract previous result
                    if (generateUpdateBefore) {
                        // prepare UPDATE_BEFORE message for previous row
                        resultRow
                                .replace(currentKey, prevAggValue)
                                .setRowKind(RowKind.UPDATE_BEFORE);
                        out.collect(resultRow);
                    }
                    // prepare UPDATE_AFTER message for new row
                    resultRow.replace(currentKey, 
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                }
{code}


Therefore, based on the consideration of TTL scenarios, I believe that when 
mini-batch aggregation is enabled, new results should also be issued when the 
aggregated result is the same as the previous one.

  was:
If mini-batch is enabled currently, and if the aggregated result is the same as 
the previous output, this time's aggregation result will not be sent 
downstream. The specific logic is as follows. This will cause downstream nodes 
to not receive updated data. If there is a TTL set for states at this time, the 
TTL of downstream will not be updated either.

https://github.com/hackergin/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224

{code:java}
                    if (!equaliser.equals(prevAggValue, newAggValue)) {
                        // new row is not same with prev row
                        if (generateUpdateBefore) {
                            // prepare UPDATE_BEFORE message for previous row
                            resultRow
                                    .replace(currentKey, prevAggValue)
                                    .setRowKind(RowKind.UPDATE_BEFORE);
                            out.collect(resultRow);
                        }
                        // prepare UPDATE_AFTER message for new row
                        resultRow.replace(currentKey, 
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                        out.collect(resultRow);
                    }
                    // new row is same with prev row, no need to output
{code}



When mini-batch is not enabled, even if the aggregation result of this time is 
the same as last time, new results will still be sent if TTL is set.

https://github.com/hackergin/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170

{code:java}

                if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
newAggValue)) {
                    // newRow is the same as before and state cleaning is not 
enabled.
                    // We do not emit retraction and acc message.
                    // If state cleaning is enabled, we have to emit messages 
to prevent too early
                    // state eviction of downstream operators.
                    return;
                } else {
                    // retract previous result
                    if (generateUpdateBefore) {
                        // prepare UPDATE_BEFORE message for previous row
                        resultRow
                                .replace(currentKey, prevAggValue)
                                .setRowKind(RowKind.UPDATE_BEFORE);
                        out.collect(resultRow);
                    }
                    // prepare UPDATE_AFTER message for new row
                    resultRow.replace(currentKey, 
newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                }
{code}


Therefore, based on the consideration of TTL scenarios, I believe that when 
mini-batch aggregation is enabled, new results should also be issued when the 
aggregated result is the same as the previous one.


> Mini-batch should output the result when the result is same as last if TTL is 
> setted.
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-33936
>                 URL: https://issues.apache.org/jira/browse/FLINK-33936
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.18.0
>            Reporter: Feng Jin
>            Priority: Major
>
> If mini-batch is enabled currently, and if the aggregated result is the same 
> as the previous output, this current aggregation result will not be sent 
> downstream.  This will cause downstream nodes to not receive updated data. If 
> there is a TTL set for states at this time, the TTL of downstream will not be 
> updated either.
> The specific logic is as follows.
> https://github.com/hackergin/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
>                     if (!equaliser.equals(prevAggValue, newAggValue)) {
>                         // new row is not same with prev row
>                         if (generateUpdateBefore) {
>                             // prepare UPDATE_BEFORE message for previous row
>                             resultRow
>                                     .replace(currentKey, prevAggValue)
>                                     .setRowKind(RowKind.UPDATE_BEFORE);
>                             out.collect(resultRow);
>                         }
>                         // prepare UPDATE_AFTER message for new row
>                         resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
>                         out.collect(resultRow);
>                     }
>                     // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time 
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/hackergin/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
>                 if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
> newAggValue)) {
>                     // newRow is the same as before and state cleaning is not 
> enabled.
>                     // We do not emit retraction and acc message.
>                     // If state cleaning is enabled, we have to emit messages 
> to prevent too early
>                     // state eviction of downstream operators.
>                     return;
>                 } else {
>                     // retract previous result
>                     if (generateUpdateBefore) {
>                         // prepare UPDATE_BEFORE message for previous row
>                         resultRow
>                                 .replace(currentKey, prevAggValue)
>                                 .setRowKind(RowKind.UPDATE_BEFORE);
>                         out.collect(resultRow);
>                     }
>                     // prepare UPDATE_AFTER message for new row
>                     resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
>                 }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when 
> mini-batch aggregation is enabled, new results should also be issued when the 
> aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to