[ https://issues.apache.org/jira/browse/FLINK-33672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zakelly Lan updated FLINK-33672: -------------------------------- Description: In code logic related with over windows, such as org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction {code:java} private transient MapState<Long, List<RowData>> inputState; public void onTimer( long timestamp, KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx, Collector<RowData> out) throws Exception { //... Iterator<Long> iter = inputState.keys().iterator(); //... while (iter.hasNext()) { Long elementKey = iter.next(); if (elementKey < limit) { // element key outside of window. Retract values List<RowData> elementsRemove = inputState.get(elementKey); // ... } } //... } {code} As we can see, there is a combination of key iteration and get the value for iterated key from inputState. However for RocksDB, the key iteration calls entry iteration, which means actually we could replace it by entry iteration without introducing any extra overhead. And as a result, we could save a function call of get() by using getValue() of iterated entry at very low cost. was: In code logic related with over windows, such as org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction {code:java} public void onTimer( long timestamp, KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx, Collector<RowData> out) throws Exception { //... Iterator<Long> iter = inputState.keys().iterator(); //... while (iter.hasNext()) { Long elementKey = iter.next(); if (elementKey < limit) { // element key outside of window. Retract values List<RowData> elementsRemove = inputState.get(elementKey); // ... } } //... } {code} As we can see, there is a combination of key iteration and get the value for iterated key from inputState. However for RocksDB, the key iteration calls entry iteration, which means actually we could replace it by entry iteration without introducing any extra overhead. And as a result, we could save a function call of get() by using getValue() of iterated entry at very low cost. > Use MapState.entries() instead of keys() and get() in over window > ----------------------------------------------------------------- > > Key: FLINK-33672 > URL: https://issues.apache.org/jira/browse/FLINK-33672 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Reporter: Zakelly Lan > Priority: Major > > In code logic related with over windows, such as > org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction > {code:java} > private transient MapState<Long, List<RowData>> inputState; > public void onTimer( > long timestamp, > KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx, > Collector<RowData> out) > throws Exception { > //... > Iterator<Long> iter = inputState.keys().iterator(); > //... > while (iter.hasNext()) { > Long elementKey = iter.next(); > if (elementKey < limit) { > // element key outside of window. Retract values > List<RowData> elementsRemove = inputState.get(elementKey); > // ... > } > } > //... > } {code} > As we can see, there is a combination of key iteration and get the value for > iterated key from inputState. However for RocksDB, the key iteration calls > entry iteration, which means actually we could replace it by entry iteration > without introducing any extra overhead. And as a result, we could save a > function call of get() by using getValue() of iterated entry at very low cost. -- This message was sent by Atlassian Jira (v8.20.10#820010)