Hi Arvid, Thanks for your response. I did not restart from the checkpoint. I assumed Flink would look for a checkpoint upon restart automatically.
*I should restart like below ?* bin/flink run -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \ Thanks, Vijay On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote: > Hi Vijay, > > edit: After re-reading your message: are you sure that you restart from a > checkpoint/savepoint? If you just start the application anew and use LATEST > initial position, this is the expected bahvior. > > --- original intended answer if you restart from checkpoint > > this is definitively not the expected behavior. > > To exclude certain error sources: > - Could you double-check if this is also happening if you don't use > unaligned checkpoints? (I don't really think this is because of unaligned > checkpoint, but it's better to be sure and we want to reduce the possible > error sources) > - Can you see the missing messages still in Kinesis? > - Could you extract all log INFO statements from > org.apache.flink.streaming.connectors.kinesis and attach them here? > - How long did you wait with recovery? > > > > On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com> > wrote: > >> Hi Team, >> >> We are trying to make sure we are not losing data when KINESIS Consumer >> is down. >> >> Kinesis streaming Job which has following checkpointing properties: >> >> >> *// checkpoint every X msecs >> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());* >> >> >> >> >> >> >> >> >> *// enable externalized checkpoints which are retained after job >> cancellation >> env.getCheckpointConfig().enableExternalizedCheckpoints( >> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION >> );// allow job recovery fallback to checkpoint when there is a more >> recent savepoint >> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint()); >> // >> enables the experimental unaligned checkpoints >> env.getCheckpointConfig().enableUnalignedCheckpoints();* >> >> *//checkpointpath* >> * env.setStateBackend(new >> FsStateBackend(Conf.getFlinkCheckPointPath(), true));* >> >> 1) We killed the Kinesis Job >> 2) Sent messages to KDS while Consumer was down. >> 3) Restarted Flink Consumer, *messages which were sent during the >> Consumer down period, never ingested (data loss).* >> 4) Re-sent messages to KDS while the consumer was still up. Messages did >> ingest fine. >> >> *How can I avoid data loss for #3 ??* >> >> From Logs: >> >> >> *2021-04-07 12:15:49,161 INFO >> org.apache.flink.runtime.jobmaster.JobMaster - Using >> application-defined state backend: File State Backend (checkpoints: >> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: >> TRUE, fileStateThreshold: -1)* >> >> >> >> *2021-04-07 12:16:02,343 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 >> ms).2021-04-07 12:16:11,951 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job >> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 >> ms).* >> >> Thanks, >> Vijay >> >