Ivan Burmistrov created FLINK-35661:
---------------------------------------

             Summary: MiniBatchGroupAggFunction can silently drop records under 
certain conditions
                 Key: FLINK-35661
                 URL: https://issues.apache.org/jira/browse/FLINK-35661
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
            Reporter: Ivan Burmistrov
         Attachments: image-2024-06-20-10-46-51-347.png, 
image-2024-06-20-10-50-20-867.png, image-2024-06-20-11-05-53-253.png

h2. The story / Symptoms

One day we changed a bit our Flink job that utilizes Flink SQL via adding a 
couple of UDF-based aggregations (it's not important what these aggregations 
are doing) and surprisingly the job started working incorrectly - producing 
wrong results for some aggregation keys, or not producing some keys at all.

The symptoms were really weird.  For instance, the read / write access rate to 
accState (the internal state used by Table SQL for group by aggregations) 
dropped sharply. On the screenshot you see the comparison of read rate to this 
state with the similar chart 1d ago - they should behave the same, yet we see a 
big difference that after the change. Similar picture was about write rate.

!image-2024-06-20-10-50-20-867.png|width=770,height=338!

Another interesting observation was that GroupAggregate operator (the one from 
Table SQL responsible for group by aggregation) behaved weirdly: the number of 
"records out" was disproportionally less than the number of "records in". By 
itself it doesn't mean anything, but combined with our other observations about 
the job producing wrong results - this seems suspicious.

!image-2024-06-20-11-05-53-253.png|width=767,height=316!
h2. Digging deeper

After reverting the change things got back to normal. And we concluded that 
adding new UDF-based aggregations caused the issue. Then we realized that we 
accidentally forgot to implement *merge* method in our UDF and this caused the 
planner to fallback to 
ONE_PHASE aggregation instead of TWO_PHASE. After fixing the mistake and 
implementing *merge* _things got back to normal._ 

Moreover, __ we realized that UDF actually has nothing to do with the issue 
(except for causing that ONE_PHASE fallback). So we reverted all the changes 
and tested the job in ONE_PHASE. *The issue was happening in such a mode.* 
So, summarizing: *when the job has mini-batch enabled, ONE_PHASE aggregation 
works incorrectly.*{*}{*}
h2. The bug

It was clear that the issue has something to do with 
[MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L57]
 because this is what distinguish ONE_PHASE from TWO_PHASE mode.

 

After reading the code, we found this interesting fragment:
{code:java}
    @Override
    public void finishBundle(Map<RowData, List<RowData>> buffer, 
Collector<RowData> out)
            throws Exception {
        for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
            RowData currentKey = entry.getKey();
            List<RowData> inputRows = entry.getValue();
            boolean firstRow = false;
            // step 1: get the accumulator for the current key
            // set current key to access state under the key
            ctx.setCurrentKey(currentKey);
            RowData acc = accState.value();
            if (acc == null) {
                // Don't create a new accumulator for a retraction message. This
                // might happen if the retraction message is the first message 
for the
                // key or after a state clean up.
                Iterator<RowData> inputIter = inputRows.iterator();
                while (inputIter.hasNext()) {
                    RowData current = inputIter.next();
                    if (isRetractMsg(current)) {
                        inputIter.remove(); // remove all the beginning 
retraction messages
                    } else {
                        break;
                    }
                }
                if (inputRows.isEmpty()) {
                    return; // !!!! <--- this is bad !!!!
                }
                acc = function.createAccumulators();
                firstRow = true;
            }
   // ...
}


{code}

In this code we iterate over the whole bundle key by key and at some point do 
this:


{code:java}
if (inputRows.isEmpty()) {
     return; 
}{code}

Obviously, what was meant here is continue (i.e.: finish with the current key, 
move to the next), not the full stop.

 

 

This line is reached when the bundle contains a key that has only retraction 
messages - in this case the code below would result in inputRows being empty:

 
{code:java}
while (inputIter.hasNext()) {
                    RowData current = inputIter.next();
                    if (isRetractMsg(current)) {
                        inputIter.remove(); // remove all the beginning 
retraction messages
                    } else {
                        break;
                    }
                }{code}
 
h2. Summary / conditions

To summarize, the bug is triggering when:
 # Mini-batch is enabled
 # ONE_PHASE aggregation phase is working
 # Mini-batch bundle contains keys having only retraction messages

When such conditions are met, MiniBatchGroupAggFunction may drop some records.

 
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to