[ 
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jichao Wang updated FLINK-21274:
--------------------------------
    Description: 
=============================Latest issue 
description(2021.02.07)===================================

I want to try to describe the issue in a more concise way:

*My issue only appears in per-job mode,*

In JsonResponseHistoryServerArchivist#archiveExecutionGraph, submit the archive 
task to ioExecutor for execution. At the same time, 
ClusterEntrypoint#stopClusterServices exits multiple thread pools in parallel 
(for example, commonRpcService, metricRegistry, MetricRegistryImpl#executor(in 
metricRegistry.shutdown())). Think about it, assuming that the archiving 
process takes 10 seconds to execute, then ExecutorUtils.nonBlockingShutdown 
will wait 10 before exiting. However, through testing, it was found that the 
JobManager process exited immediately after commonRpcService and metricRegistry 
exited. At this time, ExecutorUtils.nonBlockingShutdown is still waiting for 
the end of the archiving process, so the archiving process will not be 
completely executed.

*There are two specific reproduction methods:*

*Method one:*

Modify the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method to 
wait 5 seconds before actually writing to HDFS (simulating a slow write speed 
scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive)
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
The above modification will cause the archive to fail.

*Method two:*

In ClusterEntrypoint#stopClusterServices, before 
ExecutorUtils.nonBlockingShutdown is called, submit a task that waits 10 
seconds to ioExecutor.
{code:java}
ioExecutor.execute(new Runnable() {
    @Override
    public void run() {
        try {
            LOG.info("===ioExecutor before sleep");
            Thread.sleep(10000);
            LOG.info("===ioExecutor after sleep");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, 
ioExecutor);
{code}
According to the above modification, ===ioExecutor before sleep will be 
printed, but ===ioExecutor after sleep will not be printed.

*The root cause of the above issue is that all user threads (in Akka 
ActorSystem) have exited during the waiting, and finally the daemon thread (in 
ioExecutor) cannot be executed completely.*

 

 

 

 

 

 

=============================Older issue 
description(2021.02.04)===================================

This is a partial configuration of my Flink History service(flink-conf.yaml), 
and this is also the configuration of my Flink client.
{code:java}
jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
{code}
I used {color:#0747a6}flink run -m yarn-cluster 
/cloud/service/flink/examples/batch/WordCount.jar{color} to submit a WorkCount 
task to the Yarn cluster. Under normal circumstances, after the task is 
completed, the flink job execution information will be archived to HDFS, and 
then the JobManager process will exit. However, when this archiving process 
takes a long time (maybe the HDFS write speed is slow), the task archive file 
upload fails.

The specific reproduction method is as follows:

Modify the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method to wait 5 seconds before actually writing to HDFS (simulating a slow 
write speed scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
After I make the above changes to the code, I cannot find the corresponding 
task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).

Then I went to Yarn to browse the JobManager log (see attachment 
application_1612404624605_0010-JobManager.log for log details), and   did not 
found the following logs in the JobManager log file:
{code:java}
INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
YarnJobClusterEntrypoint with exit code 0.{code}
Usually, if the task exits normally, a similar log will be printed before 
executing {color:#0747a6}System.exit(returnCode){color}.

If no Exception information is found in the JobManager log, the above situation 
occurs, indicating that the JobManager is running to a certain point, and there 
is no user thread in the JobManager process, which causes the program to exit 
without completing the normal process.

 

 

 

Eventually I found out that multiple services (e.g. ioExecutor, metricRegistry, 
commonRpcService) were exited asynchronously in 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
 and multiple services would be exited in the shutdown() method of 
metricRegistry (e.g. executor), these exit actions are executed asynchronously 
and in parallel. If ioExecutor or executor exits after metricRegistry and 
commonRpcService , it will cause the above problems.  Why is there no such 
problem with ioExecutor being exited brefore metricRegistry and 
commonRpcService? The key difference is that the threads in ioExecutor are 
daemon threads, while the threads in metricRegistry and commonRpcService are 
user threads.

 

 

 

I hope to modify the following code to fix this bug. If it is determined that 
this is a bug (this problem will affect all versions above 1.9), please assign 
the ticket to me, thank you.

Only need to modify the 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
 method:

After fixing:
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
   try {
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }

   int returnCode;
   Throwable throwable = null;
   try {
      returnCode = 
clusterEntrypoint.getTerminationFuture().get().processExitCode();
   } catch (Throwable e) {
      throwable = e;
      returnCode = RUNTIME_FAILURE_RETURN_CODE;
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
}{code}
 Before fixing: 
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
   try {
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }

   clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
      final int returnCode;

      if (throwable != null) {
         returnCode = RUNTIME_FAILURE_RETURN_CODE;
      } else {
         returnCode = applicationStatus.processExitCode();
      }

      LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
      System.exit(returnCode);
   });
}
{code}
The purpose of the modification is to ensure that the Main thread exits last.

  was:
=============================Latest issue 
description(2021.02.07)===================================

I want to try to describe the issue in a more concise way:

*My issue only appears in per-job mode,*

In JsonResponseHistoryServerArchivist#archiveExecutionGraph, submit the archive 
task to ioExecutor for execution. At the same time, 
ClusterEntrypoint#stopClusterServices exits multiple thread pools in parallel 
(for example, commonRpcService, metricRegistry, MetricRegistryImpl#executor(in 
metricRegistry.shutdown())). Think about it, assuming that the archiving 
process takes 10 seconds to execute, then ExecutorUtils.nonBlockingShutdown 
will wait 10 before exiting. However, through testing, it was found that the 
JobManager process exited immediately after commonRpcService and metricRegistry 
exited. At this time, ExecutorUtils.nonBlockingShutdown is still waiting for 
the end of the archiving process, so the archiving process will not be 
completely executed.

*There are two specific reproduction methods:*

*Method one:*

Modify the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method to 
wait 5 seconds before actually writing to HDFS (simulating a slow write speed 
scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive)
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
The above modification will cause the archive to fail.

*Method two:*

In ClusterEntrypoint#stopClusterServices, before 
ExecutorUtils.nonBlockingShutdown is called, submit a task that waits 10 
seconds to ioExecutor.
{code:java}
ioExecutor.execute(new Runnable() {
    @Override
    public void run() {
        try {
            LOG.info("===ioExecutor before sleep");
            Thread.sleep(10000);
            LOG.info("===ioExecutor after sleep");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, 
ioExecutor);
{code}
According to the above modification, ===ioExecutor before sleep will be 
printed, but ===ioExecutor after sleep will not be printed.

*The root cause of the above issue is that all user threads (in Akka 
ActorSystem) have exited during the waiting, and finally the daemon thread (in 
ioExecutor) cannot be executed completely.*

=============================Older issue 
description(2021.02.04)===================================

This is a partial configuration of my Flink History service(flink-conf.yaml), 
and this is also the configuration of my Flink client.
{code:java}
jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
{code}
I used {color:#0747a6}flink run -m yarn-cluster 
/cloud/service/flink/examples/batch/WordCount.jar{color} to submit a WorkCount 
task to the Yarn cluster. Under normal circumstances, after the task is 
completed, the flink job execution information will be archived to HDFS, and 
then the JobManager process will exit. However, when this archiving process 
takes a long time (maybe the HDFS write speed is slow), the task archive file 
upload fails.

The specific reproduction method is as follows:

Modify the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method to wait 5 seconds before actually writing to HDFS (simulating a slow 
write speed scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
After I make the above changes to the code, I cannot find the corresponding 
task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).

Then I went to Yarn to browse the JobManager log (see attachment 
application_1612404624605_0010-JobManager.log for log details), and   did not 
found the following logs in the JobManager log file:
{code:java}
INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
YarnJobClusterEntrypoint with exit code 0.{code}
Usually, if the task exits normally, a similar log will be printed before 
executing {color:#0747a6}System.exit(returnCode){color}.

If no Exception information is found in the JobManager log, the above situation 
occurs, indicating that the JobManager is running to a certain point, and there 
is no user thread in the JobManager process, which causes the program to exit 
without completing the normal process.

 

 

 

Eventually I found out that multiple services (e.g. ioExecutor, metricRegistry, 
commonRpcService) were exited asynchronously in 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
 and multiple services would be exited in the shutdown() method of 
metricRegistry (e.g. executor), these exit actions are executed asynchronously 
and in parallel. If ioExecutor or executor exits after metricRegistry and 
commonRpcService , it will cause the above problems.  Why is there no such 
problem with ioExecutor being exited brefore metricRegistry and 
commonRpcService? The key difference is that the threads in ioExecutor are 
daemon threads, while the threads in metricRegistry and commonRpcService are 
user threads.

 

 

 

I hope to modify the following code to fix this bug. If it is determined that 
this is a bug (this problem will affect all versions above 1.9), please assign 
the ticket to me, thank you.

Only need to modify the 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
 method:

After fixing:
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
   try {
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }

   int returnCode;
   Throwable throwable = null;
   try {
      returnCode = 
clusterEntrypoint.getTerminationFuture().get().processExitCode();
   } catch (Throwable e) {
      throwable = e;
      returnCode = RUNTIME_FAILURE_RETURN_CODE;
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
}{code}
 Before fixing: 
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
   try {
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }

   clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
      final int returnCode;

      if (throwable != null) {
         returnCode = RUNTIME_FAILURE_RETURN_CODE;
      } else {
         returnCode = applicationStatus.processExitCode();
      }

      LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
      System.exit(returnCode);
   });
}
{code}
The purpose of the modification is to ensure that the Main thread exits last.


> At per-job mode,during the exit of the JobManager process, if ioExecutor 
> exits at the end, the System.exit() method will not be executed.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21274
>                 URL: https://issues.apache.org/jira/browse/FLINK-21274
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.9.3, 1.10.1, 1.11.0, 1.12.0
>            Reporter: Jichao Wang
>            Priority: Critical
>         Attachments: 1.png, 2.png, Add wait 5 seconds in 
> org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait 
> 5 seconds.log, application_1612404624605_0010-JobManager.log
>
>
> =============================Latest issue 
> description(2021.02.07)===================================
> I want to try to describe the issue in a more concise way:
> *My issue only appears in per-job mode,*
> In JsonResponseHistoryServerArchivist#archiveExecutionGraph, submit the 
> archive task to ioExecutor for execution. At the same time, 
> ClusterEntrypoint#stopClusterServices exits multiple thread pools in parallel 
> (for example, commonRpcService, metricRegistry, 
> MetricRegistryImpl#executor(in metricRegistry.shutdown())). Think about it, 
> assuming that the archiving process takes 10 seconds to execute, then 
> ExecutorUtils.nonBlockingShutdown will wait 10 before exiting. However, 
> through testing, it was found that the JobManager process exited immediately 
> after commonRpcService and metricRegistry exited. At this time, 
> ExecutorUtils.nonBlockingShutdown is still waiting for the end of the 
> archiving process, so the archiving process will not be completely executed.
> *There are two specific reproduction methods:*
> *Method one:*
> Modify the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method 
> to wait 5 seconds before actually writing to HDFS (simulating a slow write 
> speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive)
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> The above modification will cause the archive to fail.
> *Method two:*
> In ClusterEntrypoint#stopClusterServices, before 
> ExecutorUtils.nonBlockingShutdown is called, submit a task that waits 10 
> seconds to ioExecutor.
> {code:java}
> ioExecutor.execute(new Runnable() {
>     @Override
>     public void run() {
>         try {
>             LOG.info("===ioExecutor before sleep");
>             Thread.sleep(10000);
>             LOG.info("===ioExecutor after sleep");
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>     }
> });
> ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, 
> ioExecutor);
> {code}
> According to the above modification, ===ioExecutor before sleep will be 
> printed, but ===ioExecutor after sleep will not be printed.
> *The root cause of the above issue is that all user threads (in Akka 
> ActorSystem) have exited during the waiting, and finally the daemon thread 
> (in ioExecutor) cannot be executed completely.*
>  
>  
>  
>  
>  
>  
> =============================Older issue 
> description(2021.02.04)===================================
> This is a partial configuration of my Flink History service(flink-conf.yaml), 
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster 
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a 
> WorkCount task to the Yarn cluster. Under normal circumstances, after the 
> task is completed, the flink job execution information will be archived to 
> HDFS, and then the JobManager process will exit. However, when this archiving 
> process takes a long time (maybe the HDFS write speed is slow), the task 
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the 
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
>  method to wait 5 seconds before actually writing to HDFS (simulating a slow 
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive) 
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding 
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment 
> application_1612404624605_0010-JobManager.log for log details), and   did not 
> found the following logs in the JobManager log file:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before 
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above 
> situation occurs, indicating that the JobManager is running to a certain 
> point, and there is no user thread in the JobManager process, which causes 
> the program to exit without completing the normal process.
>  
>  
>  
> Eventually I found out that multiple services (e.g. ioExecutor, 
> metricRegistry, commonRpcService) were exited asynchronously in 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
>  and multiple services would be exited in the shutdown() method of 
> metricRegistry (e.g. executor), these exit actions are executed 
> asynchronously and in parallel. If ioExecutor or executor exits after 
> metricRegistry and commonRpcService , it will cause the above problems.  Why 
> is there no such problem with ioExecutor being exited brefore metricRegistry 
> and commonRpcService? The key difference is that the threads in ioExecutor 
> are daemon threads, while the threads in metricRegistry and commonRpcService 
> are user threads.
>  
>  
>  
> I hope to modify the following code to fix this bug. If it is determined that 
> this is a bug (this problem will affect all versions above 1.9), please 
> assign the ticket to me, thank you.
> Only need to modify the 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
>  method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    int returnCode;
>    Throwable throwable = null;
>    try {
>       returnCode = 
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
>    } catch (Throwable e) {
>       throwable = e;
>       returnCode = RUNTIME_FAILURE_RETURN_CODE;
>    }
>    LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
> clusterEntrypointName, returnCode, throwable);
>    System.exit(returnCode);
> }{code}
>  Before fixing: 
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
> throwable) -> {
>       final int returnCode;
>       if (throwable != null) {
>          returnCode = RUNTIME_FAILURE_RETURN_CODE;
>       } else {
>          returnCode = applicationStatus.processExitCode();
>       }
>       LOG.info("Terminating cluster entrypoint process {} with exit code 
> {}.", clusterEntrypointName, returnCode, throwable);
>       System.exit(returnCode);
>    });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to