luolei created FLINK-35988: ------------------------------ Summary: Reduce the number of state queries in the AppendOnlyFirstNFunction. Key: FLINK-35988 URL: https://issues.apache.org/jira/browse/FLINK-35988 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Affects Versions: 1.14.0 Reporter: luolei
In the AppendOnlyFirstNFunction, there are two data reads from the state within a single processElement operation. This has a significant impact on performance, especially when the state size is large. {code:java} public void processElement(RowData input, Context context, Collector<RowData> out) throws Exception { initRankEnd(input); // check message should be insert only. Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT); int currentRank = state.value() == null ? 0 : state.value(); // ignore record if it does not belong to the first-n rows if (currentRank >= rankEnd) { return; } currentRank += 1; state.update(currentRank); if (outputRankNumber || hasOffset()) { collectInsert(out, input, currentRank); } else { collectInsert(out, input); } }{code} Remedial measure: Optimize the code to reduce one state query invocation. {code:java} @Override public void processElement(RowData input, Context context, Collector<RowData> out) throws Exception { initRankEnd(input); // Ensure the message is an insert-only operation. Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT); int currentRank = getCurrentRank(); // Ignore record if it does not belong to the first-n rows if (currentRank >= rankEnd) { return; } currentRank++; state.update(currentRank); if (outputRankNumber || hasOffset()) { collectInsert(out, input, currentRank); } else { collectInsert(out, input); } } private int getCurrentRank() throws IOException { Integer value = state.value(); return value == null ? 0 : value; } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)