Roman Khachatryan created FLINK-35769: -----------------------------------------
Summary: State files might not be deleted on task cancellation Key: FLINK-35769 URL: https://issues.apache.org/jira/browse/FLINK-35769 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.19.1 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0 We have a job in an infinite (fast) restart loop, that’s crashing with a serialization issue. The issue here is that each restart seems to leak state files (not cleaning up ones from the previous run): {{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l 7990}} {{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l 689}} Eventually TM will use too much disk space. The problem is in [https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75] {code:java} try { List<CompletableFuture<Void>> futures = transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser) .collect(Collectors.toList()); // Wait until either all futures completed successfully or one failed exceptionally. FutureUtils.completeAll(futures).get(); } catch (Exception e) { downloadRequests.stream() .map(StateHandleDownloadSpec::getDownloadDestination) .map(Path::toFile) .forEach(FileUtils::deleteDirectoryQuietly); {code} Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them. But if {{completeAll}} is interrupted, then download runnable might re-create it. -- This message was sent by Atlassian Jira (v8.20.10#820010)