Hi Vijay,

if you don't specify a checkpoint, then Flink assumes you want to start
from scratch (e.g., you had a bug in your business logic and need to start
completely without state).

If there is any failure and Flink restarts automatically, it will always
pick up from the latest checkpoint [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery

On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Thanks it was working fine with: bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <contact....@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for your response. I did not restart from the checkpoint. I
>> assumed Flink would look for a checkpoint upon restart automatically.
>>
>> *I should restart like below ?*
>>
>> bin/flink run  -s
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>> \
>>
>> Thanks,
>> Vijay
>>
>> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Vijay,
>>>
>>> edit: After re-reading your message: are you sure that you restart from
>>> a checkpoint/savepoint? If you just start the application anew and use
>>> LATEST initial position, this is the expected bahvior.
>>>
>>> --- original intended answer if you restart from checkpoint
>>>
>>> this is definitively not the expected behavior.
>>>
>>> To exclude certain error sources:
>>> - Could you double-check if this is also happening if you don't use
>>> unaligned checkpoints? (I don't really think this is because of unaligned
>>> checkpoint, but it's better to be sure and we want to reduce the possible
>>> error sources)
>>> - Can you see the missing messages still in Kinesis?
>>> - Could you extract all log INFO statements from
>>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>>> - How long did you wait with recovery?
>>>
>>>
>>>
>>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> We are trying to make sure we are not losing data when KINESIS Consumer
>>>> is down.
>>>>
>>>> Kinesis streaming Job which has following checkpointing properties:
>>>>
>>>>
>>>> *// checkpoint every X msecs
>>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *// enable externalized checkpoints which are retained after job
>>>> cancellation
>>>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>>   );// allow job recovery fallback to checkpoint when there is a more
>>>> recent savepoint
>>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>>>  //
>>>> enables the experimental unaligned checkpoints
>>>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>>>
>>>> *//checkpointpath*
>>>> *        env.setStateBackend(new
>>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>>>
>>>> 1) We killed the Kinesis Job
>>>> 2) Sent messages to KDS while Consumer was down.
>>>> 3) Restarted Flink Consumer, *messages which were sent during the
>>>> Consumer down period, never ingested (data loss).*
>>>> 4) Re-sent messages to KDS while the consumer was still up. Messages
>>>> did ingest fine.
>>>>
>>>> *How can I avoid data loss for #3 ??*
>>>>
>>>> From Logs:
>>>>
>>>>
>>>> *2021-04-07 12:15:49,161 INFO
>>>>  org.apache.flink.runtime.jobmaster.JobMaster                  - Using
>>>> application-defined state backend: File State Backend (checkpoints:
>>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>>>> TRUE, fileStateThreshold: -1)*
>>>>
>>>>
>>>>
>>>> *2021-04-07 12:16:02,343 INFO
>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>>>> ms).2021-04-07 12:16:11,951 INFO
>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>>>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>>>> ms).*
>>>>
>>>> Thanks,
>>>> Vijay
>>>>
>>>

Reply via email to