Hi Vijay, if you don't specify a checkpoint, then Flink assumes you want to start from scratch (e.g., you had a bug in your business logic and need to start completely without state).
If there is any failure and Flink restarts automatically, it will always pick up from the latest checkpoint [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav <contact....@gmail.com> wrote: > Thanks it was working fine with: bin/flink run -s > s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ > \ > > On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <contact....@gmail.com> > wrote: > >> Hi Arvid, >> >> Thanks for your response. I did not restart from the checkpoint. I >> assumed Flink would look for a checkpoint upon restart automatically. >> >> *I should restart like below ?* >> >> bin/flink run -s >> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ >> \ >> >> Thanks, >> Vijay >> >> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Vijay, >>> >>> edit: After re-reading your message: are you sure that you restart from >>> a checkpoint/savepoint? If you just start the application anew and use >>> LATEST initial position, this is the expected bahvior. >>> >>> --- original intended answer if you restart from checkpoint >>> >>> this is definitively not the expected behavior. >>> >>> To exclude certain error sources: >>> - Could you double-check if this is also happening if you don't use >>> unaligned checkpoints? (I don't really think this is because of unaligned >>> checkpoint, but it's better to be sure and we want to reduce the possible >>> error sources) >>> - Can you see the missing messages still in Kinesis? >>> - Could you extract all log INFO statements from >>> org.apache.flink.streaming.connectors.kinesis and attach them here? >>> - How long did you wait with recovery? >>> >>> >>> >>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com> >>> wrote: >>> >>>> Hi Team, >>>> >>>> We are trying to make sure we are not losing data when KINESIS Consumer >>>> is down. >>>> >>>> Kinesis streaming Job which has following checkpointing properties: >>>> >>>> >>>> *// checkpoint every X msecs >>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());* >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *// enable externalized checkpoints which are retained after job >>>> cancellation >>>> env.getCheckpointConfig().enableExternalizedCheckpoints( >>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION >>>> );// allow job recovery fallback to checkpoint when there is a more >>>> recent savepoint >>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint()); >>>> // >>>> enables the experimental unaligned checkpoints >>>> env.getCheckpointConfig().enableUnalignedCheckpoints();* >>>> >>>> *//checkpointpath* >>>> * env.setStateBackend(new >>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));* >>>> >>>> 1) We killed the Kinesis Job >>>> 2) Sent messages to KDS while Consumer was down. >>>> 3) Restarted Flink Consumer, *messages which were sent during the >>>> Consumer down period, never ingested (data loss).* >>>> 4) Re-sent messages to KDS while the consumer was still up. Messages >>>> did ingest fine. >>>> >>>> *How can I avoid data loss for #3 ??* >>>> >>>> From Logs: >>>> >>>> >>>> *2021-04-07 12:15:49,161 INFO >>>> org.apache.flink.runtime.jobmaster.JobMaster - Using >>>> application-defined state backend: File State Backend (checkpoints: >>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: >>>> TRUE, fileStateThreshold: -1)* >>>> >>>> >>>> >>>> *2021-04-07 12:16:02,343 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 >>>> ms).2021-04-07 12:16:11,951 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job >>>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 >>>> ms).* >>>> >>>> Thanks, >>>> Vijay >>>> >>>