Forward to user group again since mail server was rejecting for last time ---------- Forwarded message --------- From: shuwen zhou <jaco...@gmail.com> Date: Wed, 13 Nov 2019 at 13:33 Subject: Re: RocksDB state on HDFS seems not being cleanned up To: Yun Tang <myas...@live.com> Cc: user <user@flink.apache.org>
Hi Yun, After my investigation, I found out the files are not orphan files, they are still being recorded in latest checkpoint's _metadata file. I looked through the API you mentioned https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html , seems like the state can be accessed is limited to user defined state. I am thinking that the outdated state might be belonged to a window reduce state, thus I would like to access window reduce state. Seems this API cannot provide such functionality, does it? On Thu, 7 Nov 2019 at 18:08, Yun Tang <myas...@live.com> wrote: > Yes, just sum all file size within checkpoint meta to get the full > checkpoint size (this would omit some byte stream state handles, but nearly > accurate). > > > > BTW, I think user-mail list is the better place for this email-thread, > already sent this mail to user-mail list. > > > > Best > > Yun Tang > > > > *From: *shuwen zhou <jaco...@gmail.com> > *Date: *Thursday, November 7, 2019 at 12:02 PM > *To: *Yun Tang <myas...@live.com> > *Cc: *dev <d...@flink.apache.org>, Till Rohrmann <trohrm...@apache.org> > *Subject: *Re: RocksDB state on HDFS seems not being cleanned up > > > > Hi Yun, > > Thank you for your detailed explanation,It brings me a lot to research. I > think > > 1. I should try reduce number of "*state.checkpoints.num-retained", *maybe > to 3, which could decrease amount of shared folder. > > 2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer > is yes, maybe. I could have use the state process API you mentioned to > figure it out and get back to you. > > 3. I have a look in file > /flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, > there are a lot file names > like > hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03, > sum those file's size up is the total size of each chekpoint, am I correct? > > 4. My checkpoint interval is 16 minutes. > > > > > > > > > > > > On Wed, 6 Nov 2019 at 15:57, Yun Tang <myas...@live.com> wrote: > > Hi Shuwen > > > > Since you just have 10 “chk-“ folders as expected and when subsuming > checkpoints, the “chk-” folder would be removed after we successfully > removed shared state [1]. That is to say, I think you might not have too > many orphan states files left. To ensure this, you could use state process > API [2] to load your checkpoints and compare all the files under “shared” > folder to see whether there existed too many orphan files. If this is true, > we might think of the custom compaction filter future of FRocksDB. > > > > Secondly, your judgment of “20GB each checkpoint” might not be accurate > when RocksDB incremental checkpoint is enabled, the UI showed is only the > incremental size [3], I suggest you to count your files’s size within your > checkpoint meta to know the accurate checkpoint size for each checkpoint. > > > > Last but not least, RocksDB’s future of compaction filter to delete > expired data only happened during compaction [4], I’m afraid you might need > to look up your rocksDB’s LOG file to see the frequency of compaction on > task managers. And I think the increasing size might be related with the > interval of your checkpoints, what the interval when you executing > checkpoints? > > > > > > [1] > https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264 > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2ea14169a1997434d45d6f1da6dfe9acd6bd8da3%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fcheckpoint%2FCompletedCheckpoint.java%23L264&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380844668&sdata=qC%2FWoO7cTOONGeBw1x7CO84lO4VW33VHqdLJK63mlis%3D&reserved=0> > > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Flibs%2Fstate_processor_api.html&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380854680&sdata=%2B9kpGf5Te6sDG2Up5CwCNXLV9AU%2FfmXDGQh%2B%2BJh8I9E%3D&reserved=0> > > [3] https://issues.apache.org/jira/browse/FLINK-13390 > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-13390&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380864691&sdata=tDHH%2B3ESGU1xOcqCo%2FeUh3fxGnPKqtCQCmuUNlYd8Kc%3D&reserved=0> > > [4] > https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61 > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Ffacebook%2Frocksdb%2Fblob%2F834feaff05a4bf7ae49c736305d5eb180aed4011%2Finclude%2Frocksdb%2Fcompaction_filter.h%23L61&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380874702&sdata=AWXlKG0jEtpTc5QHGoWJ%2F%2Fj5UELElT1V3FzDFfburbI%3D&reserved=0> > > > > Best > > Yun Tang > > > > *From: *shuwen zhou <jaco...@gmail.com> > *Date: *Wednesday, November 6, 2019 at 12:02 PM > *To: *dev <d...@flink.apache.org>, Yun Tang <myas...@live.com>, Till > Rohrmann <trohrm...@apache.org> > *Subject: *Re: RocksDB state on HDFS seems not being cleanned up > > > > Hi Yun and Till, > > Thank you for your response. > > For @Yun > 1. No, I just renamed the checkpoint directory name since the directory > name contains company data. Sorry for the confusion. > > 2. Yes, I set > > *state.checkpoints.num-retained: *10 > *state.backend.rocksdb.predefined-options: *FLASH_SSD_OPTIMIZED > > In flink.conf > > I was expecting, shared folder will no longer contains outdated state, since > my TTL is set to 30 mins, I shouldn't have seen date older than 1 day. > However I could still see those outdated data in shared folder > > For example, current time is 2019-11-06 03:58:00 UTC, I could see following > file on HDFS > > 65.1 M 2019-11-04 17:58 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40 > 2.1 K 2019-11-04 17:28 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac > 65.1 M 2019-11-04 17:58 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5 > 65.1 M 2019-11-04 17:58 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4 > > 65.1 M 2019-11-05 17:42 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325 > > > 3.I actually mean that, only latest 10 checkpoint containing full state will > be retained on HDFS. In my case, around 20G for each checkpoint. In such way > I could have control on how much data was stored on HDFS, Rather than having > a increasing shared folder. > > But it takes a lot of time to store full state on HDFS. Thus I would still > like to use incremental. > > > > > > For @Till > > I would have a try on cleanupInRocksdbCompactFilter to see if it works. Thank > you. > > > > On Wed, 6 Nov 2019 at 01:50, Yun Tang <myas...@live.com> wrote: > > @Till Rohrmann , I think just set `cleanupInBackground()` should be enough > for RocksDB to clean up in compaction filter after Flink-1.9.0 [1] > > @Shuwen , I have several questions for your behavior: > 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I > don't think a job-id would act like this. > 2. why you have 10 checkpoints left under checkpoint folder, did you > configure the retained checkpoints as 10? > 3. what do you mean "while I disabled incremental cleanup, the expired > full snapshot will be removed automatically." ? I cannot see that you have > configured state ttl configure as `cleanupIncrementally()`, moreover, what > is the actual meaning of "removed automatically"? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Fstate%2Fstate.html%23cleanup-in-background&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380884713&sdata=sJ%2Fznp9MFpfP9D%2BV2CMSF9LQYu4imVr3cFKThy%2Bj%2FY8%3D&reserved=0> > > Best > Yun Tang > > On 11/5/19, 11:24 PM, "Till Rohrmann" <trohrm...@apache.org> wrote: > > Hi Shuwen, > > I think the problem is that you configured state ttl to clean up on > full > snapshots which aren't executed when using RocksDB with incremental > snapshots. Instead you need to activate > `cleanupInRocksdbCompactFilter`: > > val ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30) > .updateTtlOnCreateAndWrite() > .cleanupInBackground() > .cleanupInRocksdbCompactFilter() > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > Cheers, > Till > > On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <jaco...@gmail.com> wrote: > > > Hi Jiayi, > > I understand that being shared folder means to store state of > multiple > > checkpoints. I think that shared folder should only retain data > across > > number “state.checkpoint.num-retained” checkpoints and remove > outdated > > checkpoint, isn't it? > > In my case I doubt that outdated checkpoint's states wasn't cleaned > up, > > which makes shared folder keep increasing even after TTL was passed. > > > > > > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <bupt_...@163.com> wrote: > > > > > Hi Shuwen, > > > > > > > > > The “shared” means that the state files are shared among multiple > > > checkpoints, which happens when you enable incremental > checkpointing[1]. > > > Therefore, it’s reasonable that the size keeps growing if you set > > > “state.checkpoint.num-retained” to be a big value. > > > > > > > > > [1] > > > > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink.apache.org%2Ffeatures%2F2018%2F01%2F30%2Fincremental-checkpointing.html&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380894713&sdata=k700R0EtKMPOn1UgSkztdW2ZZ5czvtX1arG3yvd%2BEBw%3D&reserved=0> > > > > > > > > > Best, > > > Jiayi Liao > > > > > > > > > Original Message > > > Sender: shuwen zhou<jaco...@gmail.com> > > > Recipient: dev<d...@flink.apache.org> > > > Date: Tuesday, Nov 5, 2019 17:59 > > > Subject: RocksDB state on HDFS seems not being cleanned up > > > > > > > > > Hi Community, I have a job running on Flink1.9.0 on YARN with > rocksDB on > > > HDFS with incremental checkpoint enabled. I have some MapState in > code > > with > > > following config: val ttlConfig = StateTtlConfig > > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > > > .cleanupInBackground() .cleanupFullSnapshot() > > > > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > > After running for around 2 days, I observed checkpoint folder is > showing > > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > > > /flink-chk743e4568a70b626837b/chk-41 91.7 M > > > /flink-chk743e4568a70b626837b/chk-42 96.1 M > > > /flink-chk743e4568a70b626837b/chk-43 48.1 M > > > /flink-chk743e4568a70b626837b/chk-44 71.6 M > > > /flink-chk743e4568a70b626837b/chk-45 50.9 M > > > /flink-chk743e4568a70b626837b/chk-46 90.2 M > > > /flink-chk743e4568a70b626837b/chk-37 49.3 M > > > /flink-chk743e4568a70b626837b/chk-38 96.9 M > > > /flink-chk743e4568a70b626837b/chk-39 797.9 G > > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > > > continuing increasing and seems the folder is not being clean up. > However > > > while I disabled incremental cleanup, the expired full snapshot > will be > > > removed automatically. Is there any way to remove outdated state > on HDFS > > to > > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou > > > > > > > > -- > > Best Wishes, > > Shuwen Zhou > > > > > > > -- > > Best Wishes, > > Shuwen Zhou > <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fpub%2Fshuwen-zhou%2F57%2F55b%2F599%2F&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380904730&sdata=H1po%2BMw%2Bz1XQjiJJrsFblhJomVI%2FQJtbL5lZdUJDxFE%3D&reserved=0> > > > > > > > -- > > Best Wishes, > > Shuwen Zhou > <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fpub%2Fshuwen-zhou%2F57%2F55b%2F599%2F&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086961380924734&sdata=ifpOHe%2BpCWWdatY8n7vhfzUjR3WpVZRDb074YU2DDuc%3D&reserved=0> > > > -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>