[ https://issues.apache.org/jira/browse/FLINK-22682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346114#comment-17346114 ]
Piotr Nowojski commented on FLINK-22682: ---------------------------------------- Based on the discrepancy between the logged checkpoint duration, and the actual time when "Triggering checkpoint" and "Completed checkpoint" messages were being logged: {noformat} 2021-05-14 10:14:05,636 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 9 (type=CHECKPOINT) @ 1620987245495 for job 58df7eb721aaefbfb08168b2c3fd6717. 2021-05-14 10:15:09,160 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 9 for job 58df7eb721aaefbfb08168b2c3fd6717 (7306603281 bytes in 7096 ms). {noformat} (7s vs 1+ minute) and the fact that "duration" timer starts ticking before "Triggering checkpoint" is logged, it looks like the problem is that something between stopping the timer in {{PendingCheckpoint#finalizeCheckpoint}} and logging "Completed checkpoint" is taking long time. Those are the methods that are being called between those two events on the happy path: {code:java} failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId()); (...) try { completedCheckpointStore.addCheckpoint(...) (...) } finally { pendingCheckpoints.remove(checkpointId); scheduleTriggerRequest(); } rememberRecentCheckpointId(checkpointId); // drop those pending checkpoints that are at prior to the completed one dropSubsumedCheckpoints(checkpointId); {code} >From those, the most likely culprit seems to be >{{completedCheckpointStore.addCheckpoint}}. > Checkpoint interval too large for higher DOP > -------------------------------------------- > > Key: FLINK-22682 > URL: https://issues.apache.org/jira/browse/FLINK-22682 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.14.0, 1.13.1 > Reporter: Arvid Heise > Priority: Major > > When running a job on EMR with DOP of 160, checkpoints are not triggered at > the set interval. I used an interval of 10s and the average checkpoint took > <10s, so I'd expect ~6 checkpoints per minute. > However, the actual completion message was heavily delayed. I had > backpressure in this test and used unaligned checkpoints. > {noformat} > 2021-05-14 10:06:39,182 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 1 for job 58df7eb721aaefbfb08168b2c3fd6717 (8940061335 bytes in > 9097 ms). > 2021-05-14 10:06:39,205 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 1 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:06:40,223 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 2 (type=CHECKPOINT) @ 1620986800082 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:07:49,263 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 2 for job 58df7eb721aaefbfb08168b2c3fd6717 (8286704522 bytes in > 6490 ms). > 2021-05-14 10:07:49,281 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 2 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:07:49,443 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 3 (type=CHECKPOINT) @ 1620986869281 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:08:55,679 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 3 for job 58df7eb721aaefbfb08168b2c3fd6717 (7398457668 bytes in > 6182 ms). > 2021-05-14 10:08:55,694 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 3 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:08:55,820 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 4 (type=CHECKPOINT) @ 1620986935694 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:09:58,024 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 4 for job 58df7eb721aaefbfb08168b2c3fd6717 (7402199053 bytes in > 6179 ms). > 2021-05-14 10:09:58,035 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 4 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:09:58,154 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 5 (type=CHECKPOINT) @ 1620986998035 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:11:02,694 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 5 for job 58df7eb721aaefbfb08168b2c3fd6717 (7395692776 bytes in > 6788 ms). > 2021-05-14 10:11:02,705 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 5 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:11:02,830 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 6 (type=CHECKPOINT) @ 1620987062705 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:12:05,043 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 6 for job 58df7eb721aaefbfb08168b2c3fd6717 (7388207757 bytes in > 6025 ms). > 2021-05-14 10:12:05,054 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 6 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:12:05,182 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 7 (type=CHECKPOINT) @ 1620987125054 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:13:04,754 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 7 for job 58df7eb721aaefbfb08168b2c3fd6717 (7360227162 bytes in > 6469 ms). > 2021-05-14 10:13:04,779 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 7 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:13:04,902 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 8 (type=CHECKPOINT) @ 1620987184779 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:14:05,486 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 8 for job 58df7eb721aaefbfb08168b2c3fd6717 (7110043004 bytes in > 5982 ms). > 2021-05-14 10:14:05,495 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 8 as completed for source Source: Sequence Source -> Map. > 2021-05-14 10:14:05,636 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 9 (type=CHECKPOINT) @ 1620987245495 for job > 58df7eb721aaefbfb08168b2c3fd6717. > 2021-05-14 10:15:09,160 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 9 for job 58df7eb721aaefbfb08168b2c3fd6717 (7306603281 bytes in > 7096 ms). > 2021-05-14 10:15:09,171 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 9 as completed for source Source: Sequence Source -> Map. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)