Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a
checkpoint/savepoint? If you just start the application anew and use LATEST
initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use
unaligned checkpoints? (I don't really think this is because of unaligned
checkpoint, but it's better to be sure and we want to reduce the possible
error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from
org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Hi Team,
>
> We are trying to make sure we are not losing data when KINESIS Consumer is
> down.
>
> Kinesis streaming Job which has following checkpointing properties:
>
>
> *// checkpoint every X msecs
> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>
>
>
>
>
>
>
>
> *// enable externalized checkpoints which are retained after job
> cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>   );// allow job recovery fallback to checkpoint when there is a more
> recent savepoint
> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>  //
> enables the experimental unaligned checkpoints
> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>
> *//checkpointpath*
> *        env.setStateBackend(new
> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>
> 1) We killed the Kinesis Job
> 2) Sent messages to KDS while Consumer was down.
> 3) Restarted Flink Consumer, *messages which were sent during the
> Consumer down period, never ingested (data loss).*
> 4) Re-sent messages to KDS while the consumer was still up. Messages did
> ingest fine.
>
> *How can I avoid data loss for #3 ??*
>
> From Logs:
>
>
> *2021-04-07 12:15:49,161 INFO
>  org.apache.flink.runtime.jobmaster.JobMaster                  - Using
> application-defined state backend: File State Backend (checkpoints:
> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
> TRUE, fileStateThreshold: -1)*
>
>
>
> *2021-04-07 12:16:02,343 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
> ms).2021-04-07 12:16:11,951 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
> ms).*
>
> Thanks,
> Vijay
>

Reply via email to