Ivan Burmistrov created FLINK-35661: ---------------------------------------
Summary: MiniBatchGroupAggFunction can silently drop records under certain conditions Key: FLINK-35661 URL: https://issues.apache.org/jira/browse/FLINK-35661 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Ivan Burmistrov Attachments: image-2024-06-20-10-46-51-347.png, image-2024-06-20-10-50-20-867.png, image-2024-06-20-11-05-53-253.png h2. The story / Symptoms One day we changed a bit our Flink job that utilizes Flink SQL via adding a couple of UDF-based aggregations (it's not important what these aggregations are doing) and surprisingly the job started working incorrectly - producing wrong results for some aggregation keys, or not producing some keys at all. The symptoms were really weird. For instance, the read / write access rate to accState (the internal state used by Table SQL for group by aggregations) dropped sharply. On the screenshot you see the comparison of read rate to this state with the similar chart 1d ago - they should behave the same, yet we see a big difference that after the change. Similar picture was about write rate. !image-2024-06-20-10-50-20-867.png|width=770,height=338! Another interesting observation was that GroupAggregate operator (the one from Table SQL responsible for group by aggregation) behaved weirdly: the number of "records out" was disproportionally less than the number of "records in". By itself it doesn't mean anything, but combined with our other observations about the job producing wrong results - this seems suspicious. !image-2024-06-20-11-05-53-253.png|width=767,height=316! h2. Digging deeper After reverting the change things got back to normal. And we concluded that adding new UDF-based aggregations caused the issue. Then we realized that we accidentally forgot to implement *merge* method in our UDF and this caused the planner to fallback to ONE_PHASE aggregation instead of TWO_PHASE. After fixing the mistake and implementing *merge* _things got back to normal._ Moreover, __ we realized that UDF actually has nothing to do with the issue (except for causing that ONE_PHASE fallback). So we reverted all the changes and tested the job in ONE_PHASE. *The issue was happening in such a mode.* So, summarizing: *when the job has mini-batch enabled, ONE_PHASE aggregation works incorrectly.*{*}{*} h2. The bug It was clear that the issue has something to do with [MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L57] because this is what distinguish ONE_PHASE from TWO_PHASE mode. After reading the code, we found this interesting fragment: {code:java} @Override public void finishBundle(Map<RowData, List<RowData>> buffer, Collector<RowData> out) throws Exception { for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) { RowData currentKey = entry.getKey(); List<RowData> inputRows = entry.getValue(); boolean firstRow = false; // step 1: get the accumulator for the current key // set current key to access state under the key ctx.setCurrentKey(currentKey); RowData acc = accState.value(); if (acc == null) { // Don't create a new accumulator for a retraction message. This // might happen if the retraction message is the first message for the // key or after a state clean up. Iterator<RowData> inputIter = inputRows.iterator(); while (inputIter.hasNext()) { RowData current = inputIter.next(); if (isRetractMsg(current)) { inputIter.remove(); // remove all the beginning retraction messages } else { break; } } if (inputRows.isEmpty()) { return; // !!!! <--- this is bad !!!! } acc = function.createAccumulators(); firstRow = true; } // ... } {code} In this code we iterate over the whole bundle key by key and at some point do this: {code:java} if (inputRows.isEmpty()) { return; }{code} Obviously, what was meant here is continue (i.e.: finish with the current key, move to the next), not the full stop. This line is reached when the bundle contains a key that has only retraction messages - in this case the code below would result in inputRows being empty: {code:java} while (inputIter.hasNext()) { RowData current = inputIter.next(); if (isRetractMsg(current)) { inputIter.remove(); // remove all the beginning retraction messages } else { break; } }{code} h2. Summary / conditions To summarize, the bug is triggering when: # Mini-batch is enabled # ONE_PHASE aggregation phase is working # Mini-batch bundle contains keys having only retraction messages When such conditions are met, MiniBatchGroupAggFunction may drop some records. -- This message was sent by Atlassian Jira (v8.20.10#820010)