Hi Smile,

Thanks for bringing up this discussion.
I think you are right, it's an bug, MiniBatchGroupAggFunction should also
do this.
You can open a jira issue for this, and offer to fix it if you wish.
Even more, current MiniBatchGroupAggFunction has some more severe bug,
it's not cleaning expired state, see  FLINK-17096.


Smile <letters_sm...@163.com> 于2020年10月12日周一 下午6:15写道:

> When using SELECT DISTINCT in Flink SQL I found that when a new message
> with
> the same key arrives, it will emit a retract and a new insert message.
> According to  JIRA-FLINK-8566
> <https://issues.apache.org/jira/browse/FLINK-8566>   and  JIRA-FLINK-8564
> <https://issues.apache.org/jira/browse/FLINK-8564>  , it's a feature to
> prevent too early state eviction of downstream operators when state
> cleaning
> is enabled. However, when I enabled  MiniBatch Aggregation
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation>
>
> , it stops emitting those retract and new messages.
>
> According to the source code of  GroupAggFunction
> <
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183>
>
> and  MiniBatchGroupAggFunction
> <
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206>
>
> , when MiniBatch Aggregation enabled, it does not emit those messages.
>
> But why not? I don't think MiniBatch Aggregation can avoid state eviction
> of
> downstream operators anyway.
>
> *GroupAggFunction.java:*
>  1 2 3 4 5 6 7 8 910111213141516
> if (!stateCleaningEnabled &amp;&amp; equaliser.equals(prevAggValue,
> newAggValue)) { // newRow is the same as before and state cleaning is not
> enabled.        // We do not emit retraction and acc message.   // If
> state cleaning
> is enabled, we have to emit messages to prevent too early       // state
> eviction
> of downstream operators.        return;} else { // retract previous
> result      if
> (generateUpdateBefore) {                // prepare UPDATE_BEFORE message
> for previous row
> resultRow.replace(currentKey,
> prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow); }
> // prepare UPDATE_AFTER message for new row
>  resultRow.replace(currentKey,
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);}
>
>
> *MiniBatchGroupAggFunction.java:*
>  1 2 3 4 5 6 7 8 9101112
> if (!equaliser.equals(prevAggValue, newAggValue)) {     // new row is not
> same
> with prev row   if (generateUpdateBefore) {             // prepare
> UPDATE_BEFORE message
> for previous row                resultRow.replace(currentKey,
> prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow); }
> // prepare UPDATE_AFTER message for new row
>  resultRow.replace(currentKey,
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);  out.collect(resultRow);}//
> new row is same with prev row, no need to output
>
>
> Flink Version: 1.12-SNAPSHOT( GitHub Master
> <https://github.com/apache/flink>  )
> Planner: Blink Planner
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


-- 

Best,
Benchao Li

Reply via email to