Renxiang Zhou created FLINK-37685:
-------------------------------------

             Summary: process the callback immediately when the processing key 
equals to the key in record context
                 Key: FLINK-37685
                 URL: https://issues.apache.org/jira/browse/FLINK-37685
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Async State Processing
    Affects Versions: 2.0.0
         Environment: Currently, the implementation of asyncProcessWithKey 
needs to create a new record context for each call. When the processing key 
equals to the current record context key, we can directly process the Callback 
to improve processing efficiency.

 
{code:java}
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> 
processing) {
    RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();


    // if the current key is equals to the old context key, we just process it 
with sync point.
    if (oldContext != null && oldContext.getKey() != null && 
oldContext.getKey().equals(key)) {
        asyncExecutionController.syncPointRequestWithCallback(processing, true);
        return;
    }

     // build a context and switch to the new context
    RecordContext<K> newContext = asyncExecutionController.buildContext(null, 
key, true);
    newContext.retain();
    asyncExecutionController.setCurrentContext(newContext);
    // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when 
the call's key
    // pass the same key in.
    asyncExecutionController.syncPointRequestWithCallback(processing, true);
    newContext.release();

    // switch to original context
    asyncExecutionController.setCurrentContext(oldContext);
} {code}
 

 

 
            Reporter: Renxiang Zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to