Thank You it helped.

> On Apr 8, 2021, at 10:53 PM, Arvid Heise <ar...@apache.org> wrote:
> 
> 
> Hi Vijay,
> 
> if you don't specify a checkpoint, then Flink assumes you want to start from 
> scratch (e.g., you had a bug in your business logic and need to start 
> completely without state).
> 
> If there is any failure and Flink restarts automatically, it will always pick 
> up from the latest checkpoint [1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery
> 
>> On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav <contact....@gmail.com> 
>> wrote:
>> Thanks it was working fine with: bin/flink run  -s 
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>>  \
>> 
>>> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <contact....@gmail.com> 
>>> wrote:
>>> Hi Arvid,
>>> 
>>> Thanks for your response. I did not restart from the checkpoint. I assumed 
>>> Flink would look for a checkpoint upon restart automatically. 
>>> 
>>> I should restart like below ?
>>> 
>>> bin/flink run  -s 
>>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>>>  \
>>> 
>>> Thanks,
>>> Vijay
>>> 
>>>> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote:
>>>> Hi Vijay,
>>>> 
>>>> edit: After re-reading your message: are you sure that you restart from a 
>>>> checkpoint/savepoint? If you just start the application anew and use 
>>>> LATEST initial position, this is the expected bahvior.
>>>> 
>>>> --- original intended answer if you restart from checkpoint
>>>> 
>>>> this is definitively not the expected behavior.
>>>> 
>>>> To exclude certain error sources:
>>>> - Could you double-check if this is also happening if you don't use 
>>>> unaligned checkpoints? (I don't really think this is because of unaligned 
>>>> checkpoint, but it's better to be sure and we want to reduce the possible 
>>>> error sources)
>>>> - Can you see the missing messages still in Kinesis?
>>>> - Could you extract all log INFO statements from 
>>>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>>>> - How long did you wait with recovery?
>>>> 
>>>> 
>>>> 
>>>>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com> 
>>>>> wrote:
>>>>> Hi Team,
>>>>> 
>>>>> We are trying to make sure we are not losing data when KINESIS Consumer 
>>>>> is down.
>>>>> 
>>>>> Kinesis streaming Job which has following checkpointing properties:
>>>>> 
>>>>> // checkpoint every X msecs
>>>>>         env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
>>>>> // enable externalized checkpoints which are retained after job 
>>>>> cancellation
>>>>>         env.getCheckpointConfig().enableExternalizedCheckpoints(
>>>>>             
>>>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>>>         );
>>>>> // allow job recovery fallback to checkpoint when there is a more recent 
>>>>> savepoint
>>>>>    
>>>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>>>>  // enables the experimental unaligned checkpoints
>>>>>         env.getCheckpointConfig().enableUnalignedCheckpoints();
>>>>> //checkpointpath
>>>>>         env.setStateBackend(new 
>>>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));
>>>>> 
>>>>> 1) We killed the Kinesis Job
>>>>> 2) Sent messages to KDS while Consumer was down.
>>>>> 3) Restarted Flink Consumer, messages which were sent during the Consumer 
>>>>> down period, never ingested (data loss).
>>>>> 4) Re-sent messages to KDS while the consumer was still up. Messages did 
>>>>> ingest fine.
>>>>> 
>>>>> How can I avoid data loss for #3 ??
>>>>> 
>>>>> From Logs:
>>>>> 
>>>>> 2021-04-07 12:15:49,161 INFO  
>>>>> org.apache.flink.runtime.jobmaster.JobMaster                  - Using 
>>>>> application-defined state backend: File State Backend (checkpoints: 
>>>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: 
>>>>> TRUE, fileStateThreshold: -1)
>>>>> 
>>>>> 2021-04-07 12:16:02,343 INFO  
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
>>>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 
>>>>> ms).
>>>>> 2021-04-07 12:16:11,951 INFO  
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 
>>>>> Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 
>>>>> 8943d16e22b8aaf65d6b9e2b8bd54113.
>>>>> 2021-04-07 12:16:12,483 INFO  
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
>>>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 
>>>>> ms).
>>>>> 
>>>>> Thanks,
>>>>> Vijay

Reply via email to