[ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Lin updated FLINK-31901: ----------------------------- Fix Version/s: ml-2.4.0 (was: ml-2.3.0) > AbstractBroadcastWrapperOperator should not block checkpoint barriers when > processing cached records > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-31901 > URL: https://issues.apache.org/jira/browse/FLINK-31901 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning > Reporter: Zhipeng Zhang > Priority: Major > Fix For: ml-2.4.0 > > > Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast > input until the broadcast inputs are all processed. After the broadcast > variables are ready, we first process the cached records and then continue to > process the newly arrived records. > > Processing cached elements is invoked via `Input#processElement` and > `Input#processWatermark`. However, processing cached element may take a long > time since there may be many cached records, which could potentially block > the checkpoint barrier. > > If we run the code snippet here[1], we are supposed to get logs as follows. > {code:java} > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at > time: 1682319149462 > processed cached records, cnt: 10000 at time: 1682319149569 > processed cached records, cnt: 20000 at time: 1682319149614 > processed cached records, cnt: 30000 at time: 1682319149655 > processed cached records, cnt: 40000 at time: 1682319149702 > processed cached records, cnt: 50000 at time: 1682319149746 > processed cached records, cnt: 60000 at time: 1682319149781 > processed cached records, cnt: 70000 at time: 1682319149891 > processed cached records, cnt: 80000 at time: 1682319150011 > processed cached records, cnt: 90000 at time: 1682319150116 > processed cached records, cnt: 100000 at time: 1682319150199 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at > time: 1682319150378 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at > time: 1682319150606 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at > time: 1682319150704 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at > time: 1682319150785 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at > time: 1682319150859 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at > time: 1682319150935 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at > time: 1682319151007{code} > > We can find that from line#2 to line#11, there is no checkpoints and the > barriers are blocked until all cached elements are processed, which takes > ~600ms and much longer than checkpoint interval (i.e., 100ms) > > [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case -- This message was sent by Atlassian Jira (v8.20.10#820010)