[ 
https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17944932#comment-17944932
 ] 

Zakelly Lan commented on FLINK-37685:
-------------------------------------

[~zhourenxiang]

Could you please explain more about the use case? This method is intended to 
open another process in current process respecting the ordering control. Even 
if it is the same key we are facing, the context is changed if using this 
method. Otherwise, just write the code in serial instead of boxing it in the 
`ThrowingRunnable<Exception>`.
e.g. 

{code:java}
// Some comments here
public void processElement()
{
    valueState1.asyncGet();
    valueState2.asyncGet();  // same key with same context, just run without 
asyncProcessWithKey
}
{code}


> process the callback immediately when the processing key equals to the key in 
> record context
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37685
>                 URL: https://issues.apache.org/jira/browse/FLINK-37685
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Async State Processing
>    Affects Versions: 2.0.0
>            Reporter: Renxiang Zhou
>            Priority: Major
>
> Currently, the implementation of asyncProcessWithKey needs to create a new 
> record context for each call. When the processing key equals to the current 
> record context key, we can directly process the Callback to improve 
> processing efficiency.
>  
> {code:java}
> public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> 
> processing) {
>     RecordContext<K> oldContext = 
> asyncExecutionController.getCurrentContext();
>     -----------
>     // if the current key is equals to the old context key, we just process 
> it with sync point.
>     if (oldContext != null && oldContext.getKey() != null && 
> oldContext.getKey().equals(key)) {
>         asyncExecutionController.syncPointRequestWithCallback(processing, 
> true);
>         return;
>     }
>     -----------
>     // build a context and switch to the new context
>     RecordContext<K> newContext = asyncExecutionController.buildContext(null, 
> key, true);
>     newContext.retain();
>     asyncExecutionController.setCurrentContext(newContext);
>     // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic 
> when the call's key
>     // pass the same key in.
>     asyncExecutionController.syncPointRequestWithCallback(processing, true);
>     newContext.release();
>     // switch to original context
>     asyncExecutionController.setCurrentContext(oldContext);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to