Feng Jiajie created FLINK-33981:
-----------------------------------

             Summary: File Descriptor References Not Released After Job 
Execution in MiniCluster Mode
                 Key: FLINK-33981
                 URL: https://issues.apache.org/jira/browse/FLINK-33981
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.18.0
            Reporter: Feng Jiajie


When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java    18162 sa_cluster   30r   DIR              253,1         0    1311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java    18162 sa_cluster   31r   DIR              253,1         0    1311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java    18162 sa_cluster   32r   DIR              253,1         0    1310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java    18162 sa_cluster   33r   DIR              253,1         0    1310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java    18162 sa_cluster   34r   DIR              253,1         0    1311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java    18162 sa_cluster   35r   DIR              253,1         0    1311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java    18162 sa_cluster   36r   DIR              253,1         0    1311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java    18162 sa_cluster   37r   DIR              253,1         0    1311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java    18162 sa_cluster   38r   DIR              253,1         0    1311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:

 
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; ++i) {
      int round = i;
      Thread thread = new Thread(() -> {
        try {
          Configuration configuration = new Configuration();
          final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
          env.setParallelism(1);

          DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 
100000);
          longDataStreamSource.addSink(new DiscardingSink<>());

          StreamGraph streamGraph = env.getStreamGraph();
          streamGraph.setJobName("test-" + System.nanoTime());
          JobClient jobClient = env.executeAsync(streamGraph);

          CompletableFuture<JobExecutionResult> 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
          JobExecutionResult jobExecutionResult = null;
          while (jobExecutionResult == null) {
            try {
              jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
            } catch (TimeoutException timeoutException) {
              // ignore
            }
          }
          System.out.println("finished round: " + round);
          env.close();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      });

      thread.setDaemon(true);
      thread.start();
      thread.join();

      System.out.println("done ... " + i);
    }
    
    // ======================= lsof -p 18162
    Thread.sleep(500_000_000);
  }
}
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to