Hi Matthias,

I think you didn't include the mailing list in your response.

According to my experiments, using last-state means the operator simply
deletes the Flink pods, and I believe that doesn't count as Cancelled, so
the artifacts for blobs and submitted job graphs are not cleaned up. I
imagine the same logic Gyula mentioned before applies, namely keep the
latest one and clean the older ones.

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> I see, I confused the Flink-internal recovery with what the Flink
> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
> an upgrade of your job, the operator will cancel the Flink job (I'm
> assuming now that you use Flink's Application mode rather than Session
> mode). The operator cancelled your job and shuts down the cluster.
> Checkpoints are retained and, therefore, can be used as the so-called "last
> state" when redeploying your job using the same Job ID. In that case, the
> corresponding jobGraph and other BLOBs should be cleaned up by Flink
> itself. The checkpoint files are retained, i.e. survive the Flink cluster
> shutdown.
>
> When redeploying the Flink cluster with the (updated) job, a new JobGraph
> file is created by Flink internally. BLOBs are recreated as well. New
> checkpoints are going to be created and old ones (that are not needed
> anymore) are cleaned up.
>
> Just to recap what I said before (to make it more explicit to
> differentiate what the k8s operator does and what Flink does internally):
> Removing the artifacts you were talking about in your previous post would
> harm Flink's internal recovery mechanism. That's probably not what you want.
>
> @Gyula: Please correct me if I misunderstood something here.
>
> I hope that helped.
> Matthias
>
> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> I see, thanks for the details.
>>
>> I do mean replacing the job without stopping it terminally. Specifically,
>> I mean updating the container image with one that contains an updated job
>> jar. Naturally, the new version must not break state compatibility, but as
>> long as that is fulfilled, the job should be able to use the last
>> checkpoint as starting point. It's my understanding that this is how the
>> Kubernetes operator's "last-state" upgrade mode works [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> > - job_name/submittedJobGraphX
>>> submittedJobGraph* is the persisted JobGraph that would be picked up in
>>> case of a failover. Deleting this file would result in Flink's failure
>>> recovery not working properly anymore if the job is still executed but
>>> needs to be restarted because the actual job definition is gone.
>>>
>>> > completedCheckpointXYZ
>>> This is the persisted CompletedCheckpoint with a reference to the actual
>>> Checkpoint directory. Deleting this file would be problematic if the state
>>> recovery relies in some way on this specific checkpoint. The HA data relies
>>> on this file to be present. Failover only fails if there's no newer
>>> checkpoint or the HA data still refers to this checkpoint in some way.
>>>
>>> > - job_name/blob/job_uuid/blob_...
>>> Artifacts of the BlobServer containing runtime artifacts of the jobs
>>> (e.g. logs, libraries, ...)
>>>
>>> In general, you don't want to clean HA artifacts if the job hasn't
>>> reached a terminal state, yet, as it harms Flink's ability to recover the
>>> job. Additionally, these files are connected with the HA backend, i.e. the
>>> file path is stored in the HA backend. Removing the artifacts will likely
>>> result in metadata becoming invalid.
>>>
>>> What do you mean with "testing updates *without* savepoints"? Are you
>>> referring to replacing the job's business logic without stopping the job?
>>>
>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Then the explanation is likely that the job has not reached a terminal
>>>> state. I was testing updates *without* savepoints (but with HA), so I guess
>>>> that never triggers automatic cleanup.
>>>>
>>>> Since, in my case, the job will theoretically never reach a terminal
>>>> state with this configuration, would it cause issues if I clean the
>>>> artifacts manually?
>>>>
>>>> *And for completeness, I also see an artifact called
>>>> completedCheckpointXYZ which is updated over time, and I imagine that
>>>> should never be removed.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>> Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
>>>> matthias.p...@aiven.io>:
>>>>
>>>>> Flink should already take care of cleaning the artifacts you
>>>>> mentioned. Flink 1.15+ even includes retries if something went wrong. 
>>>>> There
>>>>> are still a few bugs that need to be fixed (e.g. FLINK-27355 [1]).
>>>>> Checkpoint HA data is not properly cleaned up, yet, which is covered by
>>>>> FLIP-270 [2].
>>>>>
>>>>> It would be interesting to know why these artifacts haven't been
>>>>> deleted assuming that the corresponding job is actually in a final state
>>>>> (e.g. FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry
>>>>> file for that specific job available in the folder Gyula was referring to
>>>>> in the linked documentation. At least for the JobGraph files, it's likely
>>>>> that you have additional metadata still stored in your HA backend (that
>>>>> refers to the files). That would be something you might want to clean up 
>>>>> as
>>>>> well, if you want to do a proper cleanup. But still, it would be good to
>>>>> understand why these files are not cleaned up by Flink.
>>>>>
>>>>> Best,
>>>>> Matthias
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-27355
>>>>> [2]
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>>>>>
>>>>> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa <
>>>>> sarda.espin...@gmail.com> wrote:
>>>>>
>>>>>> One concrete question, under the HA folder I also see these sample
>>>>>> entries:
>>>>>>
>>>>>> - job_name/blob/job_uuid/blob_...
>>>>>> - job_name/submittedJobGraphX
>>>>>> - job_name/submittedJobGraphY
>>>>>>
>>>>>> Is it safe to clean these up when the job is in a healthy state?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>> Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
>>>>>> sarda.espin...@gmail.com>:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> that certainly helps, but to set up automatic cleanup (in my case,
>>>>>>> of azure blob storage), the ideal option would be to be able to set a
>>>>>>> simple policy that deletes blobs that haven't been updated in some time,
>>>>>>> but that would assume that anything that's actually relevant for the 
>>>>>>> latest
>>>>>>> state is "touched" by the JM on every checkpoint, and since I also see
>>>>>>> blobs referencing "submitted job graphs", I imagine that might not be a
>>>>>>> safe assumption.
>>>>>>>
>>>>>>> I understand the life cycle of those blobs isn't directly managed by
>>>>>>> the operator, but in that regard it could make things more cumbersome.
>>>>>>>
>>>>>>> Ideally, Flink itself would guarantee this sort of allowable TTL for
>>>>>>> HA files, but I'm sure that's not trivial.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Alexis.
>>>>>>>
>>>>>>> On Mon, 5 Dec 2022, 19:19 Gyula Fóra, <gyula.f...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> There are some files that are not cleaned up over time in the HA
>>>>>>>> dir that need to be cleaned up by the user:
>>>>>>>>
>>>>>>>>
>>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>>>>>>>
>>>>>>>>
>>>>>>>> Hope this helps
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>>>>>>>> sarda.espin...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I see the number of entries in the directory configured for HA
>>>>>>>>> increases over time, particularly in the context of job upgrades in a
>>>>>>>>> Kubernetes environment managed by the operator. Would it be safe to 
>>>>>>>>> assume
>>>>>>>>> that any files that haven't been updated in a while can be deleted?
>>>>>>>>> Assuming the checkpointing interval is much smaller than the period 
>>>>>>>>> used to
>>>>>>>>> determine if files are too old.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Alexis.
>>>>>>>>>
>>>>>>>>>

Reply via email to