Thanks for reaching out to the Flink community. I will respond on the JIRA ticket.
Cheers, Till On Wed, Feb 3, 2021 at 1:59 PM simpleusr <ceyhanka...@gmail.com> wrote: > Hi > > I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems > to be broken in our kafka connector sourced datastream jobs. > > Since there is a siginificant version gap and there are many backwards > uncompatible / deprecated changes in flink runtime between versions, I had > to modify our jobs and noticed that checkpoint offsets are not committed to > kafka for source connectors. > > To simplfiy the issues I created simple repoducer projects: > > https://github.com/simpleusr/flink_problem_1.5.5 > > https://github.com/simpleusr/flink_problem_1.12.0 > > It seems that there are major changes in the checkpoint infrastructure. > > For 1.5.5 checkpoint cycles works as expected as can be seen from the logs > (please note that sample project contains a small hack in > org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster > from > stopping) : > > *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job > 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873) > > [2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job > 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873) > > .................... > > [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job > 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873) > > [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job > 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)* > > However for 1.12.0 checkpoint cycles stuck at initial checkpoint: > > *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1612339584496 for job ce255b141393a358db734db2d27ef0ea. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)* > > As far as I see, checkpoint cycle is stuck at waiting in > org.apache.flink.runtime.checkpoint.CheckpointCoordinator for > coordinatorCheckpointsComplete although coordinatorsToCheckpoint is > empty... > > > final CompletableFuture<?> coordinatorCheckpointsComplete = > pendingCheckpointCompletableFuture > > .thenComposeAsync((pendingCheckpoint) -> > > > OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( > > coordinatorsToCheckpoint, pendingCheckpoint, timer), > timer); > > > Simply returning from > > OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion > when there is no coordinatorsToCheckpoint seems to resolve the problem: > > *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710) > > [2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710) > > [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job > ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131) > > [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job > ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131) > > [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ > 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710) > > [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ > 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710) > > [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job > ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131) > > [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job > ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)* > > I have also created an issue for this > > https://issues.apache.org/jira/browse/FLINK-21248 > > > Please help me if I am missing something or there is another solution > without code change. > > We need to perform the upgrade and modify our jobs as soon as possible (I > hope other breaking changes do not happen) so any help will be > appreciated.. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >