luolei created FLINK-35988:
------------------------------

             Summary: Reduce the number of state queries in the 
AppendOnlyFirstNFunction.
                 Key: FLINK-35988
                 URL: https://issues.apache.org/jira/browse/FLINK-35988
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / Runtime
    Affects Versions: 1.14.0
            Reporter: luolei


In the AppendOnlyFirstNFunction, there are two data reads from the state within 
a single processElement operation. This has a significant impact on 
performance, especially when the state size is large.
{code:java}
public void processElement(RowData input, Context context, Collector<RowData> 
out)
        throws Exception {
    initRankEnd(input);

    // check message should be insert only.
    Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
    int currentRank = state.value() == null ? 0 : state.value();
    // ignore record if it does not belong to the first-n rows
    if (currentRank >= rankEnd) {
        return;
    }
    currentRank += 1;
    state.update(currentRank);

    if (outputRankNumber || hasOffset()) {
        collectInsert(out, input, currentRank);
    } else {
        collectInsert(out, input);
    }
}{code}
 

Remedial measure: Optimize the code to reduce one state query invocation.
{code:java}
@Override
public void processElement(RowData input, Context context, Collector<RowData> 
out)
        throws Exception {
    initRankEnd(input);

    // Ensure the message is an insert-only operation.
    Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
    int currentRank = getCurrentRank();
    // Ignore record if it does not belong to the first-n rows
    if (currentRank >= rankEnd) {
        return;
    }
    currentRank++;
    state.update(currentRank);

    if (outputRankNumber || hasOffset()) {
        collectInsert(out, input, currentRank);
    } else {
        collectInsert(out, input);
    }
}

private int getCurrentRank() throws IOException {
    Integer value = state.value();
    return value == null ? 0 : value;
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to