Hi all, While working on a streaming application built with flink I have found some issues and want to ask for advice. First, our application's key configurations are like below.
flink version: 1.17.0 state.backend: "rocksdb" state.backend.incremental: "true" state.backend.changelog.enabled: "true" state.backend.changelog.storage: "filesystem" state.backend.changelog.periodic-materialize.interval: "10m" Ordinarily cleanup of chagelog and materialized state works fine. Materialization leads to the deletion of the previous changelog, and when compaction happens former file states are discarded soon. Consequently, the total number of files generated by our application remains stable and consistent. However when the job restarts due to checkpoint failure or any other reason, both the changelog and materialized state files aren't cleaned up leading to excessive files left in our checkpoint storage(hdfs). I was wondering if this is a known issue or an expected behavior? In order to address this issue, I am considering periodically deleting the changelog files and the materialized state files(files in the taskowned/ directory) based on their modification time. However, I wanted to ask if this approach is safe. Thanks in advance. Best regards, dongwoo