Thanks for reaching out to the Flink community. I will respond on the JIRA
ticket.

Cheers,
Till

On Wed, Feb 3, 2021 at 1:59 PM simpleusr <ceyhanka...@gmail.com> wrote:

> Hi
>
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
> to be broken in our kafka connector sourced datastream jobs.
>
> Since there is a siginificant version gap and there are many backwards
> uncompatible / deprecated changes in flink runtime between versions, I had
> to modify our jobs and noticed that checkpoint offsets are not committed to
> kafka for source connectors.
>
> To simplfiy the issues I created simple repoducer projects:
>
> https://github.com/simpleusr/flink_problem_1.5.5
>
> https://github.com/simpleusr/flink_problem_1.12.0
>
> It seems that there are major changes in the checkpoint infrastructure.
>
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
> (please note that sample project contains a small hack in
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster
> from
> stopping) :
>
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> ....................
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
>
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
>
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
>
> As far as I see, checkpoint cycle is stuck at waiting in
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is
> empty...
>
>
> final CompletableFuture<?> coordinatorCheckpointsComplete =
> pendingCheckpointCompletableFuture
>
> .thenComposeAsync((pendingCheckpoint) ->
>
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>
> coordinatorsToCheckpoint, pendingCheckpoint, timer),
>                                                         timer);
>
>
> Simply returning from
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
> when there is no coordinatorsToCheckpoint seems to resolve the problem:
>
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
>
> I have also created an issue for this
>
> https://issues.apache.org/jira/browse/FLINK-21248
>
>
> Please help me if I am missing something or there is another solution
> without code change.
>
> We need to perform the upgrade and modify our jobs as soon as possible (I
> hope other breaking changes do not happen) so any help will be
> appreciated..
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to