xuyangzhong commented on code in PR #25717: URL: https://github.com/apache/flink/pull/25717#discussion_r1909899286
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java: ########## @@ -112,66 +89,58 @@ public void open(OpenContext openContext) throws Exception { } dataState = getRuntimeContext().getState(valueStateDescriptor); + helper = new SyncStateFastTop1Helper(); + // metrics - registerMetric(kvCache.size() * getDefaultTopNSize()); + helper.registerMetric(); } @Override public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception { - requestCount += 1; + helper.accRequestCount(); + // load state under current key if necessary RowData currentKey = (RowData) keyContext.getCurrentKey(); - RowData prevRow = kvCache.getIfPresent(currentKey); + RowData prevRow = helper.getPrevRowFromCache(currentKey); if (prevRow == null) { prevRow = dataState.value(); } else { - hitCount += 1; + helper.accHitCount(); } // first row under current key. if (prevRow == null) { - kvCache.put(currentKey, inputRowSer.copy(input)); - if (outputRankNumber) { - collectInsert(out, input, 1); - } else { - collectInsert(out, input); - } - return; - } - - RowData curSortKey = sortKeySelector.getKey(input); - RowData oldSortKey = sortKeySelector.getKey(prevRow); - int compare = sortKeyComparator.compare(curSortKey, oldSortKey); - // current sort key is higher than old sort key - if (compare < 0) { - kvCache.put(currentKey, inputRowSer.copy(input)); - // Note: partition key is unique key if only top-1 is desired, - // thus emitting UB and UA here - if (outputRankNumber) { - collectUpdateBefore(out, prevRow, 1); - collectUpdateAfter(out, input, 1); - } else { - collectUpdateBefore(out, prevRow); - collectUpdateAfter(out, input); - } + helper.processAsFirstRow(input, currentKey, out); + } else { + helper.processWithPrevRow(input, currentKey, prevRow, out); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { - for (Map.Entry<RowData, RowData> entry : kvCache.asMap().entrySet()) { - keyContext.setCurrentKey(entry.getKey()); - flushBufferToState(entry.getValue()); - } + helper.flushAllCacheToState(); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // nothing to do } - private void flushBufferToState(RowData rowData) throws Exception { - dataState.update(rowData); + private class SyncStateFastTop1Helper extends FastTop1Helper { + + public SyncStateFastTop1Helper() { + super( + FastTop1Function.this, + inputRowSer, + cacheSize, + FastTop1Function.this.getDefaultTopNSize()); + } + + @Override + public void flushBufferToState(RowData currentKey, RowData value) throws Exception { + keyContext.setCurrentKey(currentKey); + FastTop1Function.this.dataState.update(value); Review Comment: I don't think there's much of an issue because all access to this private class is managed within the FastTop1Function itself, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org