Hi Vidya, Given that you are still on Flink 1.11 which was released in July 2020 and no longer supported by the community, I would recommend first upgrading to a later, supported version like Flink 1.16.
Best regards, Martijn On Sat, Nov 12, 2022 at 8:07 PM Vidya Sagar Mula <mulasa...@gmail.com> wrote: > Hi Yanfei, > > Thank you for the response. I have follow up answer and questions. > > I have two set ups. One is on the local environment and the other one is a > deployment scenario that is on K8s. > > - In K8s set up, I have Volume on the cluster node and mount path is > specified for the RockDB checkpoints location. So, when the Application TM > POD is restarted, the older checkpoints are read back from the host path > again when the TM is UP again. > In this case, RocksDB local directory is pulled with all the older data > which is not useful for the JOB ID as the "instanceBasePath" is calculated > with new random UUID. > > Questions: > - What do you think about the older files that are pulled from the > hostpath to mount path should be deleted first and then create the new > instanceBasepath? > Otherwise, we are going to be ended with the GBs of unwanted data. > > What is the general design recommendation is such cases where RocksDB has > mount path to a Volume on host node? > Please clarify. > > Thanks, > Vidya Sagar. > > > On Thu, Nov 10, 2022 at 7:52 PM Yanfei Lei <fredia...@gmail.com> wrote: > >> Hi Vidya Sagar, >> >> Could you please share the reason for TaskManager restart? If the machine >> or JVM process of TaskManager crashes, the `RocksDBKeyedStateBackend` can't >> be disposed/closed normally, so the existing rocksdb instance directory >> would remain. >> >> BTW, if you use Application Mode on k8s, if a TaskManager(pod) crashes, >> the rocksdb directory would be deleted as the pod is released. >> >> Best, >> Yanfei >> >> Vidya Sagar Mula <mulasa...@gmail.com> 于2022年11月11日周五 01:39写道: >> >>> Hi, >>> >>> I am using RocksDB state backend for incremental checkpointing with >>> Flink 1.11 version. >>> >>> Question: >>> ---------- >>> For a given Job ID, Intermediate RocksDB checkpoints are stored under >>> the path defined with "" >>> >>> The files are stored with "_jobID+ radom UUID" prefixed to the location. >>> >>> Case : 1 >>> --------- >>> - When I cancel the job, then all the rocksDB checkpoints are deleted >>> properly from the location corresponding to that JobId. >>> (based on "instanceBasePath" variable stored in RocksDBKeyedStateBackend >>> object). >>> "NO Issue here. Working as expected". >>> >>> Case : 2 >>> --------- >>> - When my TaskManger is restarted, the existing rocksDb checkpoints are >>> not deleted. >>> New "instanceBasePath" is constructed with the new Random UUID >>> appended to the directory. >>> And, old checkpoint directories are still there. >>> >>> questions: >>> - Is this expected behaviour not to delete the existing checkPoint >>> dirs under the rocksDB local directory? >>> - I see the "StreamTaskStateInitializerImpl.java", where new >>> StateBackend objects are created. In this case, new directory is created >>> for this Job ID appended with new random UUID. >>> What happens to the old Directories. Are they going to be purged later >>> on? >>> If not, the disk is going to filled up with the older checkpoints. >>> Please clarify this. >>> >>> Thanks, >>> Vidya Sagar. >>> >> >> >>