Hi,

> A job id subdirectory is created when you restart the job manually. In a
failover scenario, the previous job id directory will be used.

We figured that one out. On that particular cluster we probably had
scenarios where the job manager failed to start, or we ended up changing
the HA clusterId to force a resume from a savepoints, This can be the case
when we change something in the application and we cannot then resume from
a checkpoint. So I suspect we will have some housekeeping to do
ourselves in these scenarios. Keep learning, they said...

> That's weird. Are these like 'chk-x'? Normally they will be deleted by
flink.

Yes, they are chk-xxxxx, where xxxx is a checkpoint number, In the "happy
scenario", we only see one of these... But as this particular cluster ran
out of disk space on S3 (Flink isn't the only S3 client), I wonder what
happened in that case. We have tons of errors in the jobmanager logs about
failed checkpoints:

Failed to trigger or complete checkpoint 249035 for job
0ee0d82bfdce9dc3099c75c9be20ceda. (0 consecutive failed attempts so
far)org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous
task checkpoint failed.
        at
org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task
checkpoint failed.
        at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
        at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
        ... 3 more
Caused by: java.lang.Exception: java.lang.Exception: Could not materialize
checkpoint 249035 for operator...
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.ExecutionException: java.io.IOException: Could not
flush to file and close the file system output stream to s3://...
Caused by: java.io.IOException: java.io.IOException: Could not flush to
file and close the file system output stream to
s3://flink-application/checkpoints/0ee0d82bfdce9dc3099c75c9be20ceda/chk-249035/38ea0d2e-01c2-442d-a9ce-
Caused by: java.io.IOException: java.io.IOException:
com.amazonaws.services.s3.model.AmazonS3Exception: 's entity tag. (Service:
Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID:
1814D5301995C01C; S3 Extended Request ID:
2900d23ebe8b1609641456f9dce4f33cd2d2dfdae941a07c7ebbe0b8d30cd745; Proxy:
null), S3 Extended Request ID:
2900d23ebe8b1609641456f9dce4f33cd2d2dfdae941a07c7ebbe0b8d30cd745

So at that point (we have a parallelism of 6) we think only a subset of the
checkpoint made it to storage. and some parts of it returned an error... So
yes the checkpoint is invalid, but some of it made it to storage. .  What
is then meant to happen at the next checkpoint, if successful?

> . Did you trigger the savepoint periodically?
Yes we do, and yes we clean these up. Typically keep the last two. We also
read the savepoint to ensure it is readable and complete before deleting
the previous ones.


Thanks

JM

On Fri, Jan 3, 2025 at 3:36 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi,
>
>
> We noticed it first when running out of space on the S3 PV. We worked out
>> the subdirectory is the job Id, so I don't know how that is meant to be
>> cleaned up when the job is recreated with a new job Id . It might be we
>> need to work something out outside of flink.
>>
>
> A job id subdirectory is created when you restart the job manually. In a
> failover scenario, the previous job id directory will be used.
>
> But we also noticed a huge number of subdirectories below that (under the
>> current job Id) , and I am not sure why they are not cleaned up.
>>
>
> That's weird. Are these like 'chk-x'? Normally they will be deleted by
> flink.
>
> I do not remember anything about a state TTL, so it's probably not set. Is
>> that code ? a property? If so which one.
>
>
> Please read
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
> . And if it is a SQL job, please set 'table.exec.state.ttl'.
>
> Besides that, I did a test locally, it seems the job will recover from the
> latest checkpoint instead of a savepoint (even though it is the latest),
> and all the checkpoints will be properly cleaned up. Did you trigger the
> savepoint periodically? And did you clean that up manually?
>
>
> Best,
> Zakelly
>
> On Thu, Jan 2, 2025 at 3:50 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>
>> Hi,
>>
>> We noticed it first when running out of space on the S3 PV. We worked out
>> the subdirectory is the job Id, so I don't know how that is meant to be
>> cleaned up when the job is recreated with a new job Id . It might be we
>> need to work something out outside of flink.
>>
>> But we also noticed a huge number of subdirectories below that (under the
>> current job Id) , and I am not sure why they are not cleaned up.
>>
>> I do not remember anything about a state TTL, so it's probably not set.
>> Is that code ? a property? If so which one.
>>
>> Thanks
>>
>> JM
>>
>> On Thu, 2 Jan 2025, 06:38 Zakelly Lan, <zakelly....@gmail.com> wrote:
>>
>>> FW to user ML.
>>>
>>> Hi Jean-Marc,
>>>
>>> Could you elaborate more about how you noticed an increasing number of
>>> checkpoints are left behind? Is the number of subdirectories under
>>> s3://flink-application/checkpoints increasing? And have you set the state
>>> TTL?
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Jan 2, 2025 at 12:19 PM Zakelly Lan <zakelly....@gmail.com>
>>> wrote:
>>>
>>>> Hi Jean-Marc,
>>>>
>>>> Could you elaborate more about how you noticed an increasing number of
>>>> checkpoints are left behind? Is the number of subdirectories under
>>>> s3://flink-application/checkpoints increasing? And have you set the state
>>>> TTL?
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Tue, Dec 31, 2024 at 7:58 PM Jean-Marc Paulin <jm.pau...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are on Flink 1.20/Java17 running in a k8s environment, with
>>>>> checkpoints enabled on S3 and the following checkpoint options:
>>>>>
>>>>>     execution.checkpointing.dir: s3://flink-application/checkpoints
>>>>>     execution.checkpointing.externalized-checkpoint-retention:
>>>>> DELETE_ON_CANCELLATION
>>>>>     execution.checkpointing.interval: 150000 ms
>>>>>     execution.checkpointing.min-pause: 30000 ms
>>>>>     execution.checkpointing.mode: EXACTLY_ONCE
>>>>>     execution.checkpointing.savepoint-dir:
>>>>> s3://flink-application/savepoints
>>>>>     execution.checkpointing.timeout: 10 min
>>>>>     execution.checkpointing.tolerable-failed-checkpoints: "3"
>>>>>
>>>>> We have been through quite a few flink application restarts due to
>>>>> streaming failure for various reasons (mostly kafka related), but also
>>>>> flink application changes. The Flink application then tends to be resumed
>>>>> from savepoints, but we noticed an increasing number of checkpoints are
>>>>> left behind. Is there a built-in way of cleaning these obsolete 
>>>>> checkpoints?
>>>>>
>>>>> I suppose what we do not really understand is the condition(s) under
>>>>> which Flink may not clean up checkpoints. Can someone explain?
>>>>>
>>>>> Thanks
>>>>>
>>>>> JM
>>>>>
>>>>

Reply via email to