luolei created FLINK-35988:
------------------------------
Summary: Reduce the number of state queries in the
AppendOnlyFirstNFunction.
Key: FLINK-35988
URL: https://issues.apache.org/jira/browse/FLINK-35988
Project: Flink
Issue Type: New Feature
Components: Table SQL / Runtime
Affects Versions: 1.14.0
Reporter: luolei
In the AppendOnlyFirstNFunction, there are two data reads from the state within
a single processElement operation. This has a significant impact on
performance, especially when the state size is large.
{code:java}
public void processElement(RowData input, Context context, Collector<RowData>
out)
throws Exception {
initRankEnd(input);
// check message should be insert only.
Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
int currentRank = state.value() == null ? 0 : state.value();
// ignore record if it does not belong to the first-n rows
if (currentRank >= rankEnd) {
return;
}
currentRank += 1;
state.update(currentRank);
if (outputRankNumber || hasOffset()) {
collectInsert(out, input, currentRank);
} else {
collectInsert(out, input);
}
}{code}
Remedial measure: Optimize the code to reduce one state query invocation.
{code:java}
@Override
public void processElement(RowData input, Context context, Collector<RowData>
out)
throws Exception {
initRankEnd(input);
// Ensure the message is an insert-only operation.
Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
int currentRank = getCurrentRank();
// Ignore record if it does not belong to the first-n rows
if (currentRank >= rankEnd) {
return;
}
currentRank++;
state.update(currentRank);
if (outputRankNumber || hasOffset()) {
collectInsert(out, input, currentRank);
} else {
collectInsert(out, input);
}
}
private int getCurrentRank() throws IOException {
Integer value = state.value();
return value == null ? 0 : value;
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)