Libin Qin created FLINK-18803:
---------------------------------

             Summary: JobGraph cannot be GC when submit via 
RemoteStreamEnvironment in attach mode 
                 Key: FLINK-18803
                 URL: https://issues.apache.org/jira/browse/FLINK-18803
             Project: Flink
          Issue Type: Bug
          Components: Client / Job Submission
    Affects Versions: 1.7.2
            Reporter: Libin Qin
         Attachments: image-2020-08-03-18-01-56-062.png, 
image-2020-08-03-18-02-22-519.png, image-2020-08-03-18-03-29-748.png, 
image-2020-08-03-18-08-50-811.png, image-2020-08-03-18-10-08-353.png, 
image-2020-08-03-18-12-29-467.png

When submit job using  RemoteStreamEnvironment in attach mode. The client 
submission thread is blocked on "jobResultFuture.get()" in the "submitJob" 
method of RestClusterClient.java, it holds the local variable jobGraph, if the 
job is complex with lots of vertexs and edges or client submits quite a lot of 
jobs. The size of jobGraph become large and the client may OOM. I think there 
is no need for client to hold it.

The biggest objects of client heap  is as below ,The number of tasks of this 
job is more than 408

!image-2020-08-03-18-03-29-748.png!

 

!image-2020-08-03-18-08-50-811.png!

 

 

perhaps we can null out it after success of submission

 
{code:java}
//代码占位符

public JobSubmissionResult run(FlinkPlan compiledPlan,
      List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, 
SavepointRestoreSettings savepointSettings)
      throws ProgramInvocationException {
   return submitJob(() -> getJobGraph(flinkConfig, compiledPlan, libraries, 
classpaths, savepointSettings), classLoader);
}


public JobSubmissionResult submitJob(Supplier<JobGraph> jobGraphSupplier, 
ClassLoader classLoader) throws ProgramInvocationException {
   JobGraph jobGraph = jobGraphSupplier.get();
   JobID jobID = jobGraph.getJobID();
   log.info("Submitting job {} (detached: {}).", jobID, isDetached());

   final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = 
submitJob(jobGraph);
   JobSubmissionResult result;
   try {
      result = jobSubmissionFuture.get();
      //help GC
      jobGraph = null;
   } catch (Exception e) {
      throw new ProgramInvocationException("Could not submit job",
         jobID, ExceptionUtils.stripExecutionException(e));
   }

   if (isDetached()) {
      return result;
   } else {
      final CompletableFuture<JobResult> jobResultFuture = 
requestJobResult(jobID);
      final JobResult jobResult;
      try {
         jobResult = jobResultFuture.get();
      } catch (Exception e) {
         throw new ProgramInvocationException("Could not retrieve the execution 
result.",
            jobID, ExceptionUtils.stripExecutionException(e));
      }

      try {
         this.lastJobExecutionResult = 
jobResult.toJobExecutionResult(classLoader);
         return lastJobExecutionResult;
      } catch (JobExecutionException e) {
         throw new ProgramInvocationException("Job failed.", jobID, e);
      } catch (IOException | ClassNotFoundException e) {
         throw new ProgramInvocationException("Job failed.", jobID, e);
      }
   }
}
{code}
 

we can see the job graph has been GC

!image-2020-08-03-18-10-08-353.png!

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to