[ https://issues.apache.org/jira/browse/FLINK-35769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-35769: ------------------------------- Fix Version/s: 2.1.0 (was: 2.0.0) > State files might not be deleted on task cancellation > ----------------------------------------------------- > > Key: FLINK-35769 > URL: https://issues.apache.org/jira/browse/FLINK-35769 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 2.1.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Major > Fix For: 2.1.0 > > > We have a job in an infinite (fast) restart loop, that’s crashing with a > serialization issue. > The issue here is that each restart seems to leak state files (not cleaning > up ones from the previous run): > {code:java} > /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l > 7990 > /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l > 689{code} > Eventually TM will use too much disk space. > > The problem is in > [https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75] > {code:java} > try { > List<CompletableFuture<Void>> futures = > transferAllStateDataToDirectoryAsync(downloadRequests, > internalCloser) > .collect(Collectors.toList()); > // Wait until either all futures completed successfully or one > failed exceptionally. > FutureUtils.completeAll(futures).get(); > } catch (Exception e) { > downloadRequests.stream() > .map(StateHandleDownloadSpec::getDownloadDestination) > .map(Path::toFile) > .forEach(FileUtils::deleteDirectoryQuietly); {code} > Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete > them. > But if {{completeAll}} is interrupted, then download runnable might re-create > it. -- This message was sent by Atlassian Jira (v8.20.10#820010)