Renxiang Zhou created FLINK-37685: ------------------------------------- Summary: process the callback immediately when the processing key equals to the key in record context Key: FLINK-37685 URL: https://issues.apache.org/jira/browse/FLINK-37685 Project: Flink Issue Type: Improvement Components: Runtime / Async State Processing Affects Versions: 2.0.0 Environment: Currently, the implementation of asyncProcessWithKey needs to create a new record context for each call. When the processing key equals to the current record context key, we can directly process the Callback to improve processing efficiency.
{code:java} public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) { RecordContext<K> oldContext = asyncExecutionController.getCurrentContext(); // if the current key is equals to the old context key, we just process it with sync point. if (oldContext != null && oldContext.getKey() != null && oldContext.getKey().equals(key)) { asyncExecutionController.syncPointRequestWithCallback(processing, true); return; } // build a context and switch to the new context RecordContext<K> newContext = asyncExecutionController.buildContext(null, key, true); newContext.retain(); asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. asyncExecutionController.syncPointRequestWithCallback(processing, true); newContext.release(); // switch to original context asyncExecutionController.setCurrentContext(oldContext); } {code} Reporter: Renxiang Zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)