hi, Jiang I am afraid of misunderstanding what you mean, so can you elaborate on how you want to change it? For example, which interface or class do you want to add a method to? Although I am not a state expert, as far as I know, due to incremental checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the discardState method of each State.
Best, Guowei On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang <qzhzm173...@hotmail.com> wrote: > Hello Flink! > > We are building an infrastructure where we implement our own > CompletedCheckpointStore. The read and write to the external storage > location of these checkpoints are through HTTP calls to an external service. > > Recently we noticed some checkpoint file cleanup performance issue when > the job writes out a very high number of checkpoint files per checkpoint. > (In our case we had a few hundreds of operators and ran with 16 > parallelism) > During checkpoint state discard phase, since the implementation in > CompletedCheckpoint discards the state files one by one, we are seeing a > very high number of remote calls. Sometimes the deletion fails to catch up > with the checkpoint progress. > > Given the interface we are given to configure the external storage > location for checkpoints is always a `target directory`. Would it be > reasonable to expose an implementation of discard() that directly calls > disposeStorageLocation with recursive set to true, without iterating over > each individual files first? Is there any blockers for that? > > Thank you! > > > links > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240 > > https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70 >