[ https://issues.apache.org/jira/browse/FLINK-17860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Roman Khachatryan updated FLINK-17860: -------------------------------------- Description: With a high degree of parallelism, we end up with n*s number of files in each checkpoint (n = parallelism, s = stages). Writing them if fast (from many subtasks), removing them is slow (from JM). This can't be mitigated by state.backend.fs.memory-threshold because most states are ten to hundreds Mb. Instead of going through them 1 by 1, we could remove the directory recursively. The easiest way is to remove channelStateHandle.discard() calls and use isRecursive=true in FsCompletedCheckpointStorageLocation.disposeStorageLocation. Note: with the current isRecursive=false there will be an exception if there are any files left under that folder. This can be extended to other state handles in future as well. was: With a high degree of parallelism, we end up with n*n number of files in each checkpoint. Writing them if fast (from many subtasks), removing them is slow (from JM). This can't be mitigated by state.backend.fs.memory-threshold because most states are ten to hundreds Mb. Instead of going through them 1 by 1, we could remove the directory recursively. The easiest way is to remove channelStateHandle.discard() calls and use isRecursive=true in FsCompletedCheckpointStorageLocation.disposeStorageLocation. Note: with the current isRecursive=false there will be an exception if there are any files left under that folder. This can be extended to other state handles in future as well. > Recursively remove channel state directories > -------------------------------------------- > > Key: FLINK-17860 > URL: https://issues.apache.org/jira/browse/FLINK-17860 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.11.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Critical > Fix For: 1.11.0 > > > With a high degree of parallelism, we end up with n*s number of files in each > checkpoint (n = parallelism, s = stages). Writing them if fast (from many > subtasks), removing them is slow (from JM). > This can't be mitigated by state.backend.fs.memory-threshold because most > states are ten to hundreds Mb. > > Instead of going through them 1 by 1, we could remove the directory > recursively. > > The easiest way is to remove channelStateHandle.discard() calls and use > isRecursive=true in > FsCompletedCheckpointStorageLocation.disposeStorageLocation. > Note: with the current isRecursive=false there will be an exception if there > are any files left under that folder. > > This can be extended to other state handles in future as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)