[ 
https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946699#comment-17946699
 ] 

Renxiang Zhou commented on FLINK-37685:
---------------------------------------

[~zakelly] 

Get it, thanks for pointing it out.(y)

we add some unit tests in our internal version, and some unit tests are failed 
since we use asyncProcessWithKey in a wrong way.

 

> process the callback immediately when the processing key equals to the key in 
> record context
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37685
>                 URL: https://issues.apache.org/jira/browse/FLINK-37685
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Async State Processing
>    Affects Versions: 2.0.0
>            Reporter: Renxiang Zhou
>            Priority: Major
>
> Currently, the implementation of asyncProcessWithKey needs to create a new 
> record context for each call. When the processing key equals to the current 
> record context key, we can directly process the Callback to improve 
> processing efficiency.
>  
> {code:java}
> public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> 
> processing) {
>     RecordContext<K> oldContext = 
> asyncExecutionController.getCurrentContext();
>     -----------
>     // if the current key is equals to the old context key, we just process 
> it with sync point.
>     if (oldContext != null && oldContext.getKey() != null && 
> oldContext.getKey().equals(key)) {
>         asyncExecutionController.syncPointRequestWithCallback(processing, 
> true);
>         return;
>     }
>     -----------
>     // build a context and switch to the new context
>     RecordContext<K> newContext = asyncExecutionController.buildContext(null, 
> key, true);
>     newContext.retain();
>     asyncExecutionController.setCurrentContext(newContext);
>     // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic 
> when the call's key
>     // pass the same key in.
>     asyncExecutionController.syncPointRequestWithCallback(processing, true);
>     newContext.release();
>     // switch to original context
>     asyncExecutionController.setCurrentContext(oldContext);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to