Hi Yun,
Thank you for your detailed explanation,It brings me a lot to research. I
think
1. I should try reduce number of "state.checkpoints.num-retained", maybe to
3, which could decrease amount of shared folder.
2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer
is yes, maybe. I could have use the state process API you mentioned to
figure it out and get back to you.
3. I have a look in file
/flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata,
there are a lot file names
like 
hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03,
sum those file's size up is the total size of each chekpoint, am I correct?
4. My checkpoint interval is 16 minutes.





On Wed, 6 Nov 2019 at 15:57, Yun Tang <myas...@live.com> wrote:

> Hi Shuwen
>
>
>
> Since you just have 10 “chk-“ folders as expected and when subsuming
> checkpoints, the “chk-” folder would be removed after we successfully
> removed shared state [1]. That is to say, I think you might not have too
> many orphan states files left. To ensure this, you could use state process
> API [2] to load your checkpoints and compare all the files under “shared”
> folder to see whether there existed too many orphan files. If this is true,
> we might think of the custom compaction filter future of FRocksDB.
>
>
>
> Secondly, your judgment of “20GB each checkpoint” might not be accurate
> when RocksDB incremental checkpoint is enabled, the UI showed is only the
> incremental size [3], I suggest you to count your files’s size within your
> checkpoint meta to know the accurate checkpoint size for each checkpoint.
>
>
>
> Last but not least, RocksDB’s future of compaction filter to delete
> expired data only happened during compaction [4], I’m afraid you might need
> to look up your rocksDB’s LOG file to see the frequency of compaction on
> task managers. And I think the increasing size might be related with the
> interval of your checkpoints, what the interval when you executing
> checkpoints?
>
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> [3] https://issues.apache.org/jira/browse/FLINK-13390
>
> [4]
> https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *shuwen zhou <jaco...@gmail.com>
> *Date: *Wednesday, November 6, 2019 at 12:02 PM
> *To: *dev <dev@flink.apache.org>, Yun Tang <myas...@live.com>, Till
> Rohrmann <trohrm...@apache.org>
> *Subject: *Re: RocksDB state on HDFS seems not being cleanned up
>
>
>
> Hi Yun and Till,
>
> Thank you for your response.
>
> For @Yun
> 1. No, I just renamed the checkpoint directory name since the directory
> name contains company data. Sorry for the confusion.
>
> 2. Yes, I set
>
> *state.checkpoints.num-retained: *10
> *state.backend.rocksdb.predefined-options: *FLASH_SSD_OPTIMIZED
>
> In flink.conf
>
> I was expecting, shared folder will no longer contains outdated state, since 
> my TTL is set to 30 mins, I shouldn't have seen date older than 1 day. 
> However I could still see those outdated data in shared folder
>
> For example, current time is 2019-11-06 03:58:00 UTC, I could see following 
> file on HDFS
>
> 65.1 M 2019-11-04 17:58
> /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40
> 2.1 K 2019-11-04 17:28
> /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac
> 65.1 M 2019-11-04 17:58
> /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5
> 65.1 M 2019-11-04 17:58
> /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4
>
> 65.1 M 2019-11-05 17:42
> /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325
>
>
> 3.I actually mean that, only latest 10 checkpoint containing full state will 
> be retained on HDFS. In my case, around 20G for each checkpoint. In such way 
> I could have control on how much data was stored on HDFS, Rather than having 
> a increasing shared folder.
>
> But it takes a lot of time to store full state on HDFS. Thus I would still 
> like to use incremental.
>
>
>
>
>
> For @Till
>
> I would have a try on cleanupInRocksdbCompactFilter to see if it works. Thank 
> you.
>
>
>
> On Wed, 6 Nov 2019 at 01:50, Yun Tang <myas...@live.com> wrote:
>
> @Till Rohrmann , I think just set `cleanupInBackground()` should be enough
> for RocksDB to clean up in compaction filter after Flink-1.9.0 [1]
>
> @Shuwen , I have several questions for your behavior:
> 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I
> don't think a job-id would act like this.
> 2. why you have 10 checkpoints left under checkpoint folder, did you
> configure the retained checkpoints as 10?
> 3. what do you mean "while I disabled incremental cleanup, the expired
> full snapshot will be removed automatically." ? I cannot see that you have
> configured state ttl configure as `cleanupIncrementally()`, moreover, what
> is the actual meaning of "removed automatically"?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Fstate%2Fstate.html%23cleanup-in-background&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225193142&sdata=J1ysT4OumrsPUEGdoEjMByABQ3crf7I5xGMg0avU6Us%3D&reserved=0>
>
> Best
> Yun Tang
>
> On 11/5/19, 11:24 PM, "Till Rohrmann" <trohrm...@apache.org> wrote:
>
>     Hi Shuwen,
>
>     I think the problem is that you configured state ttl to clean up on
> full
>     snapshots which aren't executed when using RocksDB with incremental
>     snapshots. Instead you need to activate
> `cleanupInRocksdbCompactFilter`:
>
>     val ttlConfig = StateTtlConfig
>       .newBuilder(Time.minutes(30)
>       .updateTtlOnCreateAndWrite()
>       .cleanupInBackground()
>       .cleanupInRocksdbCompactFilter()
>
>
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
>
>     Cheers,
>     Till
>
>     On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <jaco...@gmail.com> wrote:
>
>     > Hi Jiayi,
>     > I understand that being shared folder means to store state of
> multiple
>     > checkpoints. I think that shared folder should only retain data
> across
>     > number “state.checkpoint.num-retained” checkpoints and remove
> outdated
>     > checkpoint, isn't it?
>     > In my case I doubt that outdated checkpoint's states wasn't cleaned
> up,
>     > which makes shared folder keep increasing even after TTL was passed.
>     >
>     >
>     > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <bupt_...@163.com> wrote:
>     >
>     > > Hi Shuwen,
>     > >
>     > >
>     > > The “shared” means that the state files are shared among multiple
>     > > checkpoints, which happens when you enable incremental
> checkpointing[1].
>     > > Therefore, it’s reasonable that the size keeps growing if you set
>     > > “state.checkpoint.num-retained” to be a big value.
>     > >
>     > >
>     > > [1]
>     > >
>     >
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink.apache.org%2Ffeatures%2F2018%2F01%2F30%2Fincremental-checkpointing.html&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225203151&sdata=yccGSr%2BNrrCuUse1lYJ%2FFgHInx9oXwtfkhnN1KuhDf8%3D&reserved=0>
>     > >
>     > >
>     > > Best,
>     > > Jiayi Liao
>     > >
>     > >
>     > >  Original Message
>     > > Sender: shuwen zhou<jaco...@gmail.com>
>     > > Recipient: dev<dev@flink.apache.org>
>     > > Date: Tuesday, Nov 5, 2019 17:59
>     > > Subject: RocksDB state on HDFS seems not being cleanned up
>     > >
>     > >
>     > > Hi Community, I have a job running on Flink1.9.0 on YARN with
> rocksDB on
>     > > HDFS with incremental checkpoint enabled. I have some MapState in
> code
>     > with
>     > > following config: val ttlConfig = StateTtlConfig
>     > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite()
>     > > .cleanupInBackground() .cleanupFullSnapshot()
>     > >
>     >
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
>     > > After running for around 2 days, I observed checkpoint folder is
> showing
>     > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M
>     > > /flink-chk743e4568a70b626837b/chk-41 91.7 M
>     > > /flink-chk743e4568a70b626837b/chk-42 96.1 M
>     > > /flink-chk743e4568a70b626837b/chk-43 48.1 M
>     > > /flink-chk743e4568a70b626837b/chk-44 71.6 M
>     > > /flink-chk743e4568a70b626837b/chk-45 50.9 M
>     > > /flink-chk743e4568a70b626837b/chk-46 90.2 M
>     > > /flink-chk743e4568a70b626837b/chk-37 49.3 M
>     > > /flink-chk743e4568a70b626837b/chk-38 96.9 M
>     > > /flink-chk743e4568a70b626837b/chk-39 797.9 G
>     > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems
>     > > continuing increasing and seems the folder is not being clean up.
> However
>     > > while I disabled incremental cleanup, the expired full snapshot
> will be
>     > > removed automatically. Is there any way to remove outdated state
> on HDFS
>     > to
>     > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou
>     >
>     >
>     >
>     > --
>     > Best Wishes,
>     > Shuwen Zhou
>     >
>
>
>
>
> --
>
> Best Wishes,
>
> Shuwen Zhou
> <https://nam12.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fpub%2Fshuwen-zhou%2F57%2F55b%2F599%2F&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225203151&sdata=0flYa6qSLUtXt3aWUyhiHdZhWNC3BQ7QbN1Edg%2F0xjo%3D&reserved=0>
>
>
>


-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>

Reply via email to