Feng Jiajie created FLINK-33981: ----------------------------------- Summary: File Descriptor References Not Released After Job Execution in MiniCluster Mode Key: FLINK-33981 URL: https://issues.apache.org/jira/browse/FLINK-33981 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.18.0 Reporter: Feng Jiajie
When using MiniCluster mode, file descriptors like {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals: {code:java} ... java 18162 sa_cluster 30r DIR 253,1 0 1311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java 18162 sa_cluster 31r DIR 253,1 0 1311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java 18162 sa_cluster 32r DIR 253,1 0 1310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java 18162 sa_cluster 33r DIR 253,1 0 1310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java 18162 sa_cluster 34r DIR 253,1 0 1311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java 18162 sa_cluster 35r DIR 253,1 0 1311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java 18162 sa_cluster 36r DIR 253,1 0 1311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java 18162 sa_cluster 37r DIR 253,1 0 1311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java 18162 sa_cluster 38r DIR 253,1 0 1311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) ... {code} The code used for reproduction is as follows: {code:java} import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * javac -cp 'lib/*' TestReleaseFd.java * java -Xmx600m -cp '.:lib/*' TestReleaseFd */ public class TestReleaseFd { public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { int round = i; Thread thread = new Thread(() -> { try { Configuration configuration = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 100000); longDataStreamSource.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setJobName("test-" + System.nanoTime()); JobClient jobClient = env.executeAsync(streamGraph); CompletableFuture<JobExecutionResult> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); JobExecutionResult jobExecutionResult = null; while (jobExecutionResult == null) { try { jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // ignore } } System.out.println("finished round: " + round); env.close(); } catch (Exception e) { throw new RuntimeException(e); } }); thread.setDaemon(true); thread.start(); thread.join(); System.out.println("done ... " + i); } // ======================= lsof -p 18162 Thread.sleep(500_000_000); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)