Thank You it helped.
> On Apr 8, 2021, at 10:53 PM, Arvid Heise <ar...@apache.org> wrote: > > > Hi Vijay, > > if you don't specify a checkpoint, then Flink assumes you want to start from > scratch (e.g., you had a bug in your business logic and need to start > completely without state). > > If there is any failure and Flink restarts automatically, it will always pick > up from the latest checkpoint [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery > >> On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav <contact....@gmail.com> >> wrote: >> Thanks it was working fine with: bin/flink run -s >> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ >> \ >> >>> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <contact....@gmail.com> >>> wrote: >>> Hi Arvid, >>> >>> Thanks for your response. I did not restart from the checkpoint. I assumed >>> Flink would look for a checkpoint upon restart automatically. >>> >>> I should restart like below ? >>> >>> bin/flink run -s >>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ >>> \ >>> >>> Thanks, >>> Vijay >>> >>>> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote: >>>> Hi Vijay, >>>> >>>> edit: After re-reading your message: are you sure that you restart from a >>>> checkpoint/savepoint? If you just start the application anew and use >>>> LATEST initial position, this is the expected bahvior. >>>> >>>> --- original intended answer if you restart from checkpoint >>>> >>>> this is definitively not the expected behavior. >>>> >>>> To exclude certain error sources: >>>> - Could you double-check if this is also happening if you don't use >>>> unaligned checkpoints? (I don't really think this is because of unaligned >>>> checkpoint, but it's better to be sure and we want to reduce the possible >>>> error sources) >>>> - Can you see the missing messages still in Kinesis? >>>> - Could you extract all log INFO statements from >>>> org.apache.flink.streaming.connectors.kinesis and attach them here? >>>> - How long did you wait with recovery? >>>> >>>> >>>> >>>>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com> >>>>> wrote: >>>>> Hi Team, >>>>> >>>>> We are trying to make sure we are not losing data when KINESIS Consumer >>>>> is down. >>>>> >>>>> Kinesis streaming Job which has following checkpointing properties: >>>>> >>>>> // checkpoint every X msecs >>>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval()); >>>>> // enable externalized checkpoints which are retained after job >>>>> cancellation >>>>> env.getCheckpointConfig().enableExternalizedCheckpoints( >>>>> >>>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION >>>>> ); >>>>> // allow job recovery fallback to checkpoint when there is a more recent >>>>> savepoint >>>>> >>>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint()); >>>>> // enables the experimental unaligned checkpoints >>>>> env.getCheckpointConfig().enableUnalignedCheckpoints(); >>>>> //checkpointpath >>>>> env.setStateBackend(new >>>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true)); >>>>> >>>>> 1) We killed the Kinesis Job >>>>> 2) Sent messages to KDS while Consumer was down. >>>>> 3) Restarted Flink Consumer, messages which were sent during the Consumer >>>>> down period, never ingested (data loss). >>>>> 4) Re-sent messages to KDS while the consumer was still up. Messages did >>>>> ingest fine. >>>>> >>>>> How can I avoid data loss for #3 ?? >>>>> >>>>> From Logs: >>>>> >>>>> 2021-04-07 12:15:49,161 INFO >>>>> org.apache.flink.runtime.jobmaster.JobMaster - Using >>>>> application-defined state backend: File State Backend (checkpoints: >>>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: >>>>> TRUE, fileStateThreshold: -1) >>>>> >>>>> 2021-04-07 12:16:02,343 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 >>>>> ms). >>>>> 2021-04-07 12:16:11,951 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job >>>>> 8943d16e22b8aaf65d6b9e2b8bd54113. >>>>> 2021-04-07 12:16:12,483 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 >>>>> ms). >>>>> >>>>> Thanks, >>>>> Vijay