[ 
https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35988.
-------------------------------
    Resolution: Fixed

Fixed in master: d9931c8af05d0f1f721be9fe920690fe122507ad

> Reduce the number of state queries in the AppendOnlyFirstNFunction.
> -------------------------------------------------------------------
>
>                 Key: FLINK-35988
>                 URL: https://issues.apache.org/jira/browse/FLINK-35988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.0
>            Reporter: luolei
>            Assignee: luolei
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0.0
>
>
> In the AppendOnlyFirstNFunction, there are two data reads from the state 
> within a single processElement operation. This has a significant impact on 
> performance, especially when the state size is large.
> {code:java}
> public void processElement(RowData input, Context context, Collector<RowData> 
> out)
>         throws Exception {
>     initRankEnd(input);
>     // check message should be insert only.
>     Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
>     int currentRank = state.value() == null ? 0 : state.value();
>     // ignore record if it does not belong to the first-n rows
>     if (currentRank >= rankEnd) {
>         return;
>     }
>     currentRank += 1;
>     state.update(currentRank);
>     if (outputRankNumber || hasOffset()) {
>         collectInsert(out, input, currentRank);
>     } else {
>         collectInsert(out, input);
>     }
> }{code}
>  
> Remedial measure: Optimize the code to reduce one state query invocation.
> {code:java}
> @Override
> public void processElement(RowData input, Context context, Collector<RowData> 
> out)
>         throws Exception {
>     initRankEnd(input);
>     // Ensure the message is an insert-only operation.
>     Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
>     int currentRank = getCurrentRank();
>     // Ignore record if it does not belong to the first-n rows
>     if (currentRank >= rankEnd) {
>         return;
>     }
>     currentRank++;
>     state.update(currentRank);
>     if (outputRankNumber || hasOffset()) {
>         collectInsert(out, input, currentRank);
>     } else {
>         collectInsert(out, input);
>     }
> }
> private int getCurrentRank() throws IOException {
>     Integer value = state.value();
>     return value == null ? 0 : value;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to