[ https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946699#comment-17946699 ]
Renxiang Zhou commented on FLINK-37685: --------------------------------------- [~zakelly] Get it, thanks for pointing it out.(y) we add some unit tests in our internal version, and some unit tests are failed since we use asyncProcessWithKey in a wrong way. > process the callback immediately when the processing key equals to the key in > record context > -------------------------------------------------------------------------------------------- > > Key: FLINK-37685 > URL: https://issues.apache.org/jira/browse/FLINK-37685 > Project: Flink > Issue Type: Improvement > Components: Runtime / Async State Processing > Affects Versions: 2.0.0 > Reporter: Renxiang Zhou > Priority: Major > > Currently, the implementation of asyncProcessWithKey needs to create a new > record context for each call. When the processing key equals to the current > record context key, we can directly process the Callback to improve > processing efficiency. > > {code:java} > public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> > processing) { > RecordContext<K> oldContext = > asyncExecutionController.getCurrentContext(); > ----------- > // if the current key is equals to the old context key, we just process > it with sync point. > if (oldContext != null && oldContext.getKey() != null && > oldContext.getKey().equals(key)) { > asyncExecutionController.syncPointRequestWithCallback(processing, > true); > return; > } > ----------- > // build a context and switch to the new context > RecordContext<K> newContext = asyncExecutionController.buildContext(null, > key, true); > newContext.retain(); > asyncExecutionController.setCurrentContext(newContext); > // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic > when the call's key > // pass the same key in. > asyncExecutionController.syncPointRequestWithCallback(processing, true); > newContext.release(); > // switch to original context > asyncExecutionController.setCurrentContext(oldContext); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)