[ 
https://issues.apache.org/jira/browse/FLINK-35769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35769:
-------------------------------
    Fix Version/s: 2.1.0
                       (was: 2.0.0)

> State files might not be deleted on task cancellation
> -----------------------------------------------------
>
>                 Key: FLINK-35769
>                 URL: https://issues.apache.org/jira/browse/FLINK-35769
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 2.1.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 2.1.0
>
>
> We have a job in an infinite (fast) restart loop, that’s crashing with a 
> serialization issue.
> The issue here is that each restart seems to leak state files (not cleaning 
> up ones from the previous run):
> {code:java}
> /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
> 7990
> /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
> 689{code}
> Eventually TM will use too much disk space.
>  
> The problem is in 
> [https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
> {code:java}
> try {
>             List<CompletableFuture<Void>> futures =
>                     transferAllStateDataToDirectoryAsync(downloadRequests, 
> internalCloser)
>                             .collect(Collectors.toList());
>             // Wait until either all futures completed successfully or one 
> failed exceptionally.
>             FutureUtils.completeAll(futures).get();
>         } catch (Exception e) {
>             downloadRequests.stream()
>                     .map(StateHandleDownloadSpec::getDownloadDestination)
>                     .map(Path::toFile)
>                     .forEach(FileUtils::deleteDirectoryQuietly); {code}
> Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete 
> them.
> But if {{completeAll}} is interrupted, then download runnable might re-create 
> it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to