[ 
https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31901:
-----------------------------
    Fix Version/s: ml-2.4.0
                       (was: ml-2.3.0)

> AbstractBroadcastWrapperOperator should not block checkpoint barriers when 
> processing cached records
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31901
>                 URL: https://issues.apache.org/jira/browse/FLINK-31901
>             Project: Flink
>          Issue Type: Improvement
>          Components: Library / Machine Learning
>            Reporter: Zhipeng Zhang
>            Priority: Major
>             Fix For: ml-2.4.0
>
>
> Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast 
> input until the broadcast inputs are all processed. After the broadcast 
> variables are ready, we first process the cached records and then continue to 
> process the newly arrived records.
>  
> Processing cached elements is invoked via `Input#processElement` and 
> `Input#processWatermark`.  However, processing cached element may take a long 
> time since there may be many cached records, which could potentially block 
> the checkpoint barrier.
>  
> If we run the code snippet here[1], we are supposed to get logs as follows.
> {code:java}
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at 
> time: 1682319149462
> processed cached records, cnt: 10000 at time: 1682319149569
> processed cached records, cnt: 20000 at time: 1682319149614
> processed cached records, cnt: 30000 at time: 1682319149655
> processed cached records, cnt: 40000 at time: 1682319149702
> processed cached records, cnt: 50000 at time: 1682319149746
> processed cached records, cnt: 60000 at time: 1682319149781
> processed cached records, cnt: 70000 at time: 1682319149891
> processed cached records, cnt: 80000 at time: 1682319150011
> processed cached records, cnt: 90000 at time: 1682319150116
> processed cached records, cnt: 100000 at time: 1682319150199
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at 
> time: 1682319150378
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at 
> time: 1682319150606
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at 
> time: 1682319150704
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at 
> time: 1682319150785
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at 
> time: 1682319150859
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at 
> time: 1682319150935
> OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at 
> time: 1682319151007{code}
>  
> We can find that from line#2 to line#11, there is no checkpoints and the 
> barriers are blocked until all cached elements are processed, which takes 
> ~600ms and much longer than checkpoint interval (i.e., 100ms)
>  
>   [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to