[ https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee closed FLINK-35988. ------------------------------- Resolution: Fixed Fixed in master: d9931c8af05d0f1f721be9fe920690fe122507ad > Reduce the number of state queries in the AppendOnlyFirstNFunction. > ------------------------------------------------------------------- > > Key: FLINK-35988 > URL: https://issues.apache.org/jira/browse/FLINK-35988 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Affects Versions: 1.14.0 > Reporter: luolei > Assignee: luolei > Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > In the AppendOnlyFirstNFunction, there are two data reads from the state > within a single processElement operation. This has a significant impact on > performance, especially when the state size is large. > {code:java} > public void processElement(RowData input, Context context, Collector<RowData> > out) > throws Exception { > initRankEnd(input); > // check message should be insert only. > Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT); > int currentRank = state.value() == null ? 0 : state.value(); > // ignore record if it does not belong to the first-n rows > if (currentRank >= rankEnd) { > return; > } > currentRank += 1; > state.update(currentRank); > if (outputRankNumber || hasOffset()) { > collectInsert(out, input, currentRank); > } else { > collectInsert(out, input); > } > }{code} > > Remedial measure: Optimize the code to reduce one state query invocation. > {code:java} > @Override > public void processElement(RowData input, Context context, Collector<RowData> > out) > throws Exception { > initRankEnd(input); > // Ensure the message is an insert-only operation. > Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT); > int currentRank = getCurrentRank(); > // Ignore record if it does not belong to the first-n rows > if (currentRank >= rankEnd) { > return; > } > currentRank++; > state.update(currentRank); > if (outputRankNumber || hasOffset()) { > collectInsert(out, input, currentRank); > } else { > collectInsert(out, input); > } > } > private int getCurrentRank() throws IOException { > Integer value = state.value(); > return value == null ? 0 : value; > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)