[ https://issues.apache.org/jira/browse/FLINK-23003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364760#comment-17364760 ]
Yanfei Lei commented on FLINK-23003: ------------------------------------ Sure, opened. [https://github.com/apache/flink/pull/16176] [~roman_khachatryan] would you please take a review again? > Resource leak in RocksIncrementalSnapshotStrategy > ------------------------------------------------- > > Key: FLINK-23003 > URL: https://issues.apache.org/jira/browse/FLINK-23003 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.14.0, 1.13.1, 1.12.4 > Environment: Flink: 1.14-SNAPSHOT > Reporter: Yanfei Lei > Assignee: Yanfei Lei > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.2 > > > We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is > not closed correctly after being used. It would lead to a resource leak. > `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and > `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` > uses the `ExecutorService` to upload files to DFS asynchronously. > When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the > backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a > close() function. > And we encountered an example caused by this problem. When we benchmarked the > performance of incremental rescaling, we observed that the forked VM of JMH > can't exit normally. > {code:java} > [INFO] > [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark --- > # JMH version: 1.19 > # VM version: JDK 1.8.0_281, VM 25.281-b09 > # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java > # VM options: -Djava.rmi.server.hostname=127.0.0.1 > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl > # Warmup: 10 iterations, 1 s each > # Measurement: 10 iterations, 1 s each > # Timeout: 10 min per iteration > # Threads: 1 thread, will synchronize iterations > # Benchmark mode: Average time, time/op > # Benchmark: > org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp > # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run > progress: 0.00% complete, ETA 00:02:00 > # Fork: 1 of 3 > # Warmup Iteration 1: 244.717 ms/op > # Warmup Iteration 2: 104.749 ms/op > # Warmup Iteration 3: 104.182 ms/op > ... > Iteration 1: 96.600 ms/op > Iteration 2: 108.463 ms/op > Iteration 3: 93.657 ms/op > ...<JMH had finished, but forked VM did not exit, are there stray running > threads? Waiting 24 seconds more...>Non-finished threads: > ... > Thread[pool-15-thread-4,5,main] > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > <shutdown timeout of 30 seconds expired, forcing forked VM to exit>{code} > > The root cause of this example is that the `{{RocksDBStateUploader}}` in > `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when > `{{RocksDBKeyedStateBackend`}} is disposed. > > The solution to this problem is quite straightforward, > `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be > closed when cleaning up `{{RocksDBKeyedStateBackend}}`. -- This message was sent by Atlassian Jira (v8.3.4#803005)