Shihuan Liu created FLINK-39307:
-----------------------------------
Summary: Improve checkpoint deletion speed in CompletedCheckpoint
Key: FLINK-39307
URL: https://issues.apache.org/jira/browse/FLINK-39307
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing
Reporter: Shihuan Liu
In flink, due to checkpoints have a retention number, after completed a
checkpoint the job will delete an old one. Right now to delete that old
checkpoint, the files within the checkpoint are deleted in sequential:
{code:java}
discard() // on ioExecutor thread
├─ metadataHandle.discardState() // fs.delete(file1,false)
├─ for each OperatorState: // e.g. 5 operators
│ └─ for each subtask: // e.g. 200 subtasks
│ └─ for each state handle: // e.g. 1-3 handles per
subtask
│ └─ FileStateHandle.discardState() // fs.delete(fileN, false)
~"false" -> file deletion
└─ disposeStorageLocation() // fs.delete(emptyDir,
false) {code}
In our company (Uber), we found that if the checkpoint has a large number of
files, like many thousand files, the deletion speed is slow due to the fact
that the job has to delete files one-by-one, and send an rpc call to storage
backend server (like OCI/GCS servers) per file.
We are thinking of one potential improvement, which is to do a directory
deletion instead of file-by-file deletion such that the job just calls
fs.delete on the checkpoint directory (like chk-1010 for example). This will
allow storage layer's batch deletion api (within GCS/OCI clients for example)
to kick in, so that they can fit multiple files deletion requests into a single
rpc call. This potentially can significantly improve checkpoint clean up
performance. We can do this for full checkpointing mode, as for this mode all
checkpoint files are hosted under the chk-N folder.
Any thoughts/recommendations?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)