[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cai Liuyang updated FLINK-32316: -------------------------------- Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. And we also found another problem see jira: https://issues.apache.org/jira/browse/FLINK-32362 was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. -And we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover) lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception), i think we should increase the robustness of announceCombinedWatermark function to cover this case (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] )- (see jira: https://issues.apache.org/jira/browse/FLINK-32362) > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > -------------------------------------------------------------------------------- > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug > Affects Versions: 1.16.0 > Reporter: Cai Liuyang > Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > > And we also found another problem see jira: > https://issues.apache.org/jira/browse/FLINK-32362 -- This message was sent by Atlassian Jira (v8.20.10#820010)