Hey all, I've been trying to debug a job recovery performance issue and I'm noticing some interesting events in the timeline that seem unexpected to me. Here's a brief outline of the first checkpoint following a job restart:
1. All tasks are deployed and transition into the RUNNING state. 2. I see logs for a subset of initializeState calls ("{} - restoring state" from TwoPhaseCommitSinkFunction) 3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}." 4. I see more "{} - restoring state" logs. 5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {} ms)." The 2 questions I have are: Are the initializations in 4) in the middle of a checkpoint expected? Since all the tasks transition in 1) I would think that they are initialized there as well. Are the initializations in 4) causing the checkpoint to take longer to complete? During the checkpoint, I do see "{} - checkpoint {} complete, committing transaction {} from checkpoint {}" logs (TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which suggests that the kafka producers in 2) and 4) are contributing to the checkpoint. Thanks! -Teng