Hi Matthias, I think you didn't include the mailing list in your response.
According to my experiments, using last-state means the operator simply deletes the Flink pods, and I believe that doesn't count as Cancelled, so the artifacts for blobs and submitted job graphs are not cleaned up. I imagine the same logic Gyula mentioned before applies, namely keep the latest one and clean the older ones. Regards, Alexis. Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl < matthias.p...@aiven.io>: > I see, I confused the Flink-internal recovery with what the Flink > Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do > an upgrade of your job, the operator will cancel the Flink job (I'm > assuming now that you use Flink's Application mode rather than Session > mode). The operator cancelled your job and shuts down the cluster. > Checkpoints are retained and, therefore, can be used as the so-called "last > state" when redeploying your job using the same Job ID. In that case, the > corresponding jobGraph and other BLOBs should be cleaned up by Flink > itself. The checkpoint files are retained, i.e. survive the Flink cluster > shutdown. > > When redeploying the Flink cluster with the (updated) job, a new JobGraph > file is created by Flink internally. BLOBs are recreated as well. New > checkpoints are going to be created and old ones (that are not needed > anymore) are cleaned up. > > Just to recap what I said before (to make it more explicit to > differentiate what the k8s operator does and what Flink does internally): > Removing the artifacts you were talking about in your previous post would > harm Flink's internal recovery mechanism. That's probably not what you want. > > @Gyula: Please correct me if I misunderstood something here. > > I hope that helped. > Matthias > > On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa < > sarda.espin...@gmail.com> wrote: > >> I see, thanks for the details. >> >> I do mean replacing the job without stopping it terminally. Specifically, >> I mean updating the container image with one that contains an updated job >> jar. Naturally, the new version must not break state compatibility, but as >> long as that is fulfilled, the job should be able to use the last >> checkpoint as starting point. It's my understanding that this is how the >> Kubernetes operator's "last-state" upgrade mode works [1]. >> >> [1] >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades >> >> Regards, >> Alexis. >> >> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl < >> matthias.p...@aiven.io>: >> >>> > - job_name/submittedJobGraphX >>> submittedJobGraph* is the persisted JobGraph that would be picked up in >>> case of a failover. Deleting this file would result in Flink's failure >>> recovery not working properly anymore if the job is still executed but >>> needs to be restarted because the actual job definition is gone. >>> >>> > completedCheckpointXYZ >>> This is the persisted CompletedCheckpoint with a reference to the actual >>> Checkpoint directory. Deleting this file would be problematic if the state >>> recovery relies in some way on this specific checkpoint. The HA data relies >>> on this file to be present. Failover only fails if there's no newer >>> checkpoint or the HA data still refers to this checkpoint in some way. >>> >>> > - job_name/blob/job_uuid/blob_... >>> Artifacts of the BlobServer containing runtime artifacts of the jobs >>> (e.g. logs, libraries, ...) >>> >>> In general, you don't want to clean HA artifacts if the job hasn't >>> reached a terminal state, yet, as it harms Flink's ability to recover the >>> job. Additionally, these files are connected with the HA backend, i.e. the >>> file path is stored in the HA backend. Removing the artifacts will likely >>> result in metadata becoming invalid. >>> >>> What do you mean with "testing updates *without* savepoints"? Are you >>> referring to replacing the job's business logic without stopping the job? >>> >>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa < >>> sarda.espin...@gmail.com> wrote: >>> >>>> Hi Matthias, >>>> >>>> Then the explanation is likely that the job has not reached a terminal >>>> state. I was testing updates *without* savepoints (but with HA), so I guess >>>> that never triggers automatic cleanup. >>>> >>>> Since, in my case, the job will theoretically never reach a terminal >>>> state with this configuration, would it cause issues if I clean the >>>> artifacts manually? >>>> >>>> *And for completeness, I also see an artifact called >>>> completedCheckpointXYZ which is updated over time, and I imagine that >>>> should never be removed. >>>> >>>> Regards, >>>> Alexis. >>>> >>>> Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl < >>>> matthias.p...@aiven.io>: >>>> >>>>> Flink should already take care of cleaning the artifacts you >>>>> mentioned. Flink 1.15+ even includes retries if something went wrong. >>>>> There >>>>> are still a few bugs that need to be fixed (e.g. FLINK-27355 [1]). >>>>> Checkpoint HA data is not properly cleaned up, yet, which is covered by >>>>> FLIP-270 [2]. >>>>> >>>>> It would be interesting to know why these artifacts haven't been >>>>> deleted assuming that the corresponding job is actually in a final state >>>>> (e.g. FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry >>>>> file for that specific job available in the folder Gyula was referring to >>>>> in the linked documentation. At least for the JobGraph files, it's likely >>>>> that you have additional metadata still stored in your HA backend (that >>>>> refers to the files). That would be something you might want to clean up >>>>> as >>>>> well, if you want to do a proper cleanup. But still, it would be good to >>>>> understand why these files are not cleaned up by Flink. >>>>> >>>>> Best, >>>>> Matthias >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27355 >>>>> [2] >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints >>>>> >>>>> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa < >>>>> sarda.espin...@gmail.com> wrote: >>>>> >>>>>> One concrete question, under the HA folder I also see these sample >>>>>> entries: >>>>>> >>>>>> - job_name/blob/job_uuid/blob_... >>>>>> - job_name/submittedJobGraphX >>>>>> - job_name/submittedJobGraphY >>>>>> >>>>>> Is it safe to clean these up when the job is in a healthy state? >>>>>> >>>>>> Regards, >>>>>> Alexis. >>>>>> >>>>>> Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa < >>>>>> sarda.espin...@gmail.com>: >>>>>> >>>>>>> Hi Gyula, >>>>>>> >>>>>>> that certainly helps, but to set up automatic cleanup (in my case, >>>>>>> of azure blob storage), the ideal option would be to be able to set a >>>>>>> simple policy that deletes blobs that haven't been updated in some time, >>>>>>> but that would assume that anything that's actually relevant for the >>>>>>> latest >>>>>>> state is "touched" by the JM on every checkpoint, and since I also see >>>>>>> blobs referencing "submitted job graphs", I imagine that might not be a >>>>>>> safe assumption. >>>>>>> >>>>>>> I understand the life cycle of those blobs isn't directly managed by >>>>>>> the operator, but in that regard it could make things more cumbersome. >>>>>>> >>>>>>> Ideally, Flink itself would guarantee this sort of allowable TTL for >>>>>>> HA files, but I'm sure that's not trivial. >>>>>>> >>>>>>> Regards, >>>>>>> Alexis. >>>>>>> >>>>>>> On Mon, 5 Dec 2022, 19:19 Gyula Fóra, <gyula.f...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi! >>>>>>>> >>>>>>>> There are some files that are not cleaned up over time in the HA >>>>>>>> dir that need to be cleaned up by the user: >>>>>>>> >>>>>>>> >>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak >>>>>>>> >>>>>>>> >>>>>>>> Hope this helps >>>>>>>> Gyula >>>>>>>> >>>>>>>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa < >>>>>>>> sarda.espin...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I see the number of entries in the directory configured for HA >>>>>>>>> increases over time, particularly in the context of job upgrades in a >>>>>>>>> Kubernetes environment managed by the operator. Would it be safe to >>>>>>>>> assume >>>>>>>>> that any files that haven't been updated in a while can be deleted? >>>>>>>>> Assuming the checkpointing interval is much smaller than the period >>>>>>>>> used to >>>>>>>>> determine if files are too old. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Alexis. >>>>>>>>> >>>>>>>>>