[ https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279454#comment-17279454 ]
Jichao Wang edited comment on FLINK-21274 at 2/5/21, 8:24 AM: -------------------------------------------------------------- 我任务我的问题和ha无关,因为我提交的任务是Flink官方提供的Example中的WordCount,并没有开启HA。 My flink job and my problem have nothing to do with ha, because the task I submitted is the WordCount in the Example provided by Flink, and HA is not turned on. 我想尝试进一步的解释: I want to try further explanation: 我在 org.apache.flink.runtime.entrypoint.ClusterEntrypoint 中添加了一些打印线程信息的日志,用来获取JobManager退出前的状态,代码修改如下: I added some logs that print thread information to org.apache.flink.runtime.entrypoint.ClusterEntrypoint to get the status before the JobManager exits. The code is modified as follows: // code1 {code:java} public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); try { clusterEntrypoint.startCluster(); } catch (ClusterEntrypointException e) { LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } // code1-1 clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> { final int returnCode; LOG.info("================clusterEntrypoint.getTerminationFuture isTerminal======================================"); LOG.info("===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads(); if (throwable != null) { returnCode = RUNTIME_FAILURE_RETURN_CODE; } else { returnCode = applicationStatus.processExitCode(); } LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable); System.exit(returnCode); }); }{code} // code2 private void cleanupDirectories() throws IOException { LOG.info("===================Starting cleanupDirectories================================"); LOG.info("cleanupDirectories===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("cleanupDirectories"); ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG); final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); FileUtils.deleteDirectory(new File(webTmpDir)); } // code3 {code:java} protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) { final long shutdownTimeout = configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT); synchronized (lock) { Throwable exception = null; final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); if (blobServer != null) { try { blobServer.close(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } if (haServices != null) { try { if (cleanupHaData) { haServices.closeAndCleanupAllData(); } else { haServices.close(); } } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } if (archivedExecutionGraphStore != null) { try { archivedExecutionGraphStore.close(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } if (processMetricGroup != null) { processMetricGroup.close(); } if (metricRegistry != null) { LOG.info("===metricRegistry is not null"); CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown(); terminationFutures.add(futureMetricRegistry); futureMetricRegistry.whenComplete((aVoid, throwable) -> { // code3-1 LOG.info("========================metricRegistry shutdowns successfully==================================="); LOG.info("metricRegistry===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("metricRegistry"); }); } if (ioExecutor != null) { LOG.info("===ioExecutor is not null"); CompletableFuture<Void> futureIoExecutor = ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, ioExecutor); terminationFutures.add(futureIoExecutor); futureIoExecutor.whenComplete((aVoid, throwable) -> { // code3-2 LOG.info("==============ioExecutor shutdowns successfully=========================="); LOG.info("ioExecutor===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("ioExecutor"); }); } if (commonRpcService != null) { LOG.info("===commonRpcService is not null"); CompletableFuture<Void> futureCommonRpcService = commonRpcService.stopService(); terminationFutures.add(futureCommonRpcService); futureCommonRpcService.whenComplete((aVoid, throwable) -> { // code3-3 LOG.info("============================commonRpcService shutdowns successfully====================================="); LOG.info("commonRpcService===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("commonRpcService"); }); } if (exception != null) { terminationFutures.add(FutureUtils.completedExceptionally(exception)); } return FutureUtils.completeAll(terminationFutures); }{code} // code4 {code:java} public static void printThreads(String name) { ThreadGroup group = Thread.currentThread().getThreadGroup(); ThreadGroup topGroup = group; while (group != null) { topGroup = group; group = group.getParent(); } int slackSize = topGroup.activeCount() * 2; Thread[] slackThreads = new Thread[slackSize]; int actualSize = topGroup.enumerate(slackThreads); Thread[] atualThreads = new Thread[actualSize]; System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize); LOG.info(name+"===Threads size is " + atualThreads.length); for (Thread thread : atualThreads) { LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " + thread.isDaemon()); } }{code} 日志文件解释: The attached log file description: Add wait 5 seconds in org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log 这个日志文件对应code6,在 org.apache.flink.runtime.history.FsJobArchivist#archiveJob 方法中加入了5秒等待的逻辑 This log file corresponds to code6, and a 5-second wait logic is added to the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method Not add wait 5 seconds.log 这个日志文件对应code6, org.apache.flink.runtime.history.FsJobArchivist#archiveJob 方法中删除5秒等待的逻辑 This log file corresponds to code6, delete the logic to wait for 5 seconds in the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method // code5 {code:java} public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException { try { FileSystem fs = rootPath.getFileSystem(); Path path = new Path(rootPath, jobId.toString()); OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); try { LOG.info("===========================Wait 5 seconds.."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { ... // Part of the code is omitted here } catch (Exception e) { fs.delete(path, false); throw e; } LOG.info("Job {} has been archived at {}.", jobId, path); return path; } catch (IOException e) { LOG.error("Failed to archive job.", e); throw e; } }{code} // code6 {code:java} public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException { try { FileSystem fs = rootPath.getFileSystem(); Path path = new Path(rootPath, jobId.toString()); OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); // There is no logic to wait for 5 seconds here try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { ... // Part of the code is omitted here } catch (Exception e) { fs.delete(path, false); throw e; } LOG.info("Job {} has been archived at {}.", jobId, path); return path; } catch (IOException e) { LOG.error("Failed to archive job.", e); throw e; } }{code} 我认为,code1-1 和 code2 中打印的线程信息就是JobManager进程退出前的线程信息。 I think that the thread information printed in code1-1 and code2 is the thread information before the JobManager process exits. 如果加入5秒等待,code3-2将不会被执行,也就是说ioExecutor未能正常退出。 If add a 5-second wait, code3-2 and code1-1 will not be executed, which means that ioExecutor fails to exit normally. code2 被 YarnJobClusterEntrypoint shutdown hook 执行。 and code2 is executed by the YarnJobClusterEntrypoint shutdown hook. 如果删除5秒等待,此时我们发现程序正常退出,code1-1 被 flink-akka.actor.default-dispatcher-16 线程执行。 与此对应的还有一种情况,如果metricRegistry 比 commonRpcService后退出,执行 code1-1 的将是 flink-metrics-scheduler-1。 If you delete the 5-second wait, we find that the program exits normally, and code1-1 is executed by the flink-akka.actor.default-dispatcher-16 thread. Corresponding to this, there is another situation. If metricRegistry exits after commonRpcService, code1-1 will be executed by flink-metrics-scheduler-1. JobManager在退出前会并行退出多个线程池,不同的线程池退出顺序导致,执行 code1-1 的线程不同。如果守护线程池(例如:ioExecutor),最后退出,code1-1将不会被执行。这就是问题的根本原因。 JobManager exits multiple thread pools in parallel before JobManager process exiting. Different thread pool exit orders result in different threads executing code1-1. If the thread pool is daemon pool (for example: ioExecutor) and finally exits, code1-1 will not be executed. This is the root cause of the problem. 如果将ioExecutor对应的类的 org.apache.flink.runtime.util.ExecutorThreadFactory#newThread 方法从code7 修改成 code8,这么做以后,即使我们在code5中等待10秒,code1-1也会得到执行。此时执行code1-1的线程将是 ForkJoinPool.commonPool-worker-57 If the org.apache.flink.runtime.util.ExecutorThreadFactory#newThread method of the class corresponding to ioExecutor is modified from code7 to code8, after doing so, even if we wait 10 seconds in code5, code1-1 will be executed. At this time, the thread executing code1-1 will be ForkJoinPool.commonPool-worker-57 // code7 {code:java} @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(true); t.setPriority(threadPriority); // optional handler for uncaught exceptions if (exceptionHandler != null) { t.setUncaughtExceptionHandler(exceptionHandler); } return t; }{code} // code8 {code:java} @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(false); t.setPriority(threadPriority); // optional handler for uncaught exceptions if (exceptionHandler != null) { t.setUncaughtExceptionHandler(exceptionHandler); } return t; }{code} What do you think? was (Author: wjc920): 我任务我的问题和ha无关,因为我提交的任务是Flink官方提供的Example中的WordCount,并没有开启HA。 My flink job and my problem have nothing to do with ha, because the task I submitted is the WordCount in the Example provided by Flink, and HA is not turned on. 我想尝试进一步的解释: I want to try further explanation: 我在 org.apache.flink.runtime.entrypoint.ClusterEntrypoint 中添加了一些打印线程信息的日志,用来获取JobManager退出前的状态,代码修改如下: I added some logs that print thread information to org.apache.flink.runtime.entrypoint.ClusterEntrypoint to get the status before the JobManager exits. The code is modified as follows: // code1 {code:java} public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); try { clusterEntrypoint.startCluster(); } catch (ClusterEntrypointException e) { LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } // code1-1 clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> { final int returnCode; LOG.info("================clusterEntrypoint.getTerminationFuture isTerminal======================================"); LOG.info("===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads(); if (throwable != null) { returnCode = RUNTIME_FAILURE_RETURN_CODE; } else { returnCode = applicationStatus.processExitCode(); } LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable); System.exit(returnCode); }); }{code} // code2 private void cleanupDirectories() throws IOException { LOG.info("===================Starting cleanupDirectories================================"); LOG.info("cleanupDirectories===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("cleanupDirectories"); ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG); final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); FileUtils.deleteDirectory(new File(webTmpDir)); } // code3 {code:java} protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) { final long shutdownTimeout = configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT); synchronized (lock) { Throwable exception = null; final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); if (blobServer != null) { try { blobServer.close(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } if (haServices != null) { try { if (cleanupHaData) { haServices.closeAndCleanupAllData(); } else { haServices.close(); } } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } if (archivedExecutionGraphStore != null) { try { archivedExecutionGraphStore.close(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } if (processMetricGroup != null) { processMetricGroup.close(); } if (metricRegistry != null) { LOG.info("===metricRegistry is not null"); CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown(); terminationFutures.add(futureMetricRegistry); futureMetricRegistry.whenComplete((aVoid, throwable) -> { // code3-1 LOG.info("========================metricRegistry shutdowns successfully==================================="); LOG.info("metricRegistry===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("metricRegistry"); }); } if (ioExecutor != null) { LOG.info("===ioExecutor is not null"); CompletableFuture<Void> futureIoExecutor = ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, ioExecutor); terminationFutures.add(futureIoExecutor); futureIoExecutor.whenComplete((aVoid, throwable) -> { // code3-2 LOG.info("==============ioExecutor shutdowns successfully=========================="); LOG.info("ioExecutor===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("ioExecutor"); }); } if (commonRpcService != null) { LOG.info("===commonRpcService is not null"); CompletableFuture<Void> futureCommonRpcService = commonRpcService.stopService(); terminationFutures.add(futureCommonRpcService); futureCommonRpcService.whenComplete((aVoid, throwable) -> { // code3-3 LOG.info("============================commonRpcService shutdowns successfully====================================="); LOG.info("commonRpcService===Current thread name is: " + Thread.currentThread().getName() + ", isDaemon: " + Thread.currentThread().isDaemon()); printThreads("commonRpcService"); }); } if (exception != null) { terminationFutures.add(FutureUtils.completedExceptionally(exception)); } return FutureUtils.completeAll(terminationFutures); }{code} // code4 {code:java} public static void printThreads(String name) { ThreadGroup group = Thread.currentThread().getThreadGroup(); ThreadGroup topGroup = group; while (group != null) { topGroup = group; group = group.getParent(); } int slackSize = topGroup.activeCount() * 2; Thread[] slackThreads = new Thread[slackSize]; int actualSize = topGroup.enumerate(slackThreads); Thread[] atualThreads = new Thread[actualSize]; System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize); LOG.info(name+"===Threads size is " + atualThreads.length); for (Thread thread : atualThreads) { LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " + thread.isDaemon()); } }{code} 日志文件解释: The attached log file description: Add wait 5 seconds in org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log 这个日志文件对应code6,在 org.apache.flink.runtime.history.FsJobArchivist#archiveJob 方法中加入了5秒等待的逻辑 This log file corresponds to code6, and a 5-second wait logic is added to the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method Not add wait 5 seconds.log 这个日志文件对应code6, org.apache.flink.runtime.history.FsJobArchivist#archiveJob 方法中删除5秒等待的逻辑 This log file corresponds to code6, delete the logic to wait for 5 seconds in the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method // code5 {code:java} public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException { try { FileSystem fs = rootPath.getFileSystem(); Path path = new Path(rootPath, jobId.toString()); OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); try { LOG.info("===========================Wait 5 seconds.."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { ... // Part of the code is omitted here } catch (Exception e) { fs.delete(path, false); throw e; } LOG.info("Job {} has been archived at {}.", jobId, path); return path; } catch (IOException e) { LOG.error("Failed to archive job.", e); throw e; } }{code} // code6 {code:java} public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException { try { FileSystem fs = rootPath.getFileSystem(); Path path = new Path(rootPath, jobId.toString()); OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); // There is no logic to wait for 5 seconds here try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { ... // Part of the code is omitted here } catch (Exception e) { fs.delete(path, false); throw e; } LOG.info("Job {} has been archived at {}.", jobId, path); return path; } catch (IOException e) { LOG.error("Failed to archive job.", e); throw e; } }{code} 我认为,code1-1 和 code2 中打印的线程信息就是JobManager进程退出前的线程信息。 I think that the thread information printed in code1-1 and code2 is the thread information before the JobManager process exits. 如果加入5秒等待,code3-2将不会被执行,也就是说ioExecutor未能正常退出。 If add a 5-second wait, code3-2 and code1-1 will not be executed, which means that ioExecutor fails to exit normally. code2 被 YarnJobClusterEntrypoint shutdown hook 执行。 and code2 is executed by the YarnJobClusterEntrypoint shutdown hook. 如果删除5秒等待,此时我们发现程序正常退出,code1-1 被 flink-akka.actor.default-dispatcher-16 线程执行。 与此对应的还有一种情况,如果metricRegistry 比 commonRpcService后退出,执行 code1-1 的将是 flink-metrics-scheduler-1。 If you delete the 5-second wait, we find that the program exits normally, and code1-1 is executed by the flink-akka.actor.default-dispatcher-16 thread. Corresponding to this, there is another situation. If metricRegistry exits after commonRpcService, code1-1 will be executed by flink-metrics-scheduler-1. JobManager在退出前会并行退出多个线程池,不同的线程池退出顺序导致,执行 code1-1 的线程不同。如果守护线程池(例如:ioExecutor),最后退出,code1-1将不会被执行。这就是问题的根本原因。 JobManager exits multiple thread pools in parallel before JobManager process exiting. Different thread pool exit orders result in different threads executing code1-1. If the thread pool is daemon pool (for example: ioExecutor) and finally exits, code1-1 will not be executed. This is the root cause of the problem. 如果将ioExecutor对应的类的 org.apache.flink.runtime.util.ExecutorThreadFactory#newThread 方法从code7 修改成 code8,这么做以后,即使我们在code5中等待10秒,code1-1也会得到执行。此时执行code1-1的线程将是 ForkJoinPool.commonPool-worker-57 If the org.apache.flink.runtime.util.ExecutorThreadFactory#newThread method of the class corresponding to ioExecutor is modified from code7 to code8, after doing so, even if we wait 10 seconds in code5, code1-1 will be executed. At this time, the thread executing code1-1 will be ForkJoinPool.commonPool-worker-57 // code7 {code:java} @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(true); t.setPriority(threadPriority); // optional handler for uncaught exceptions if (exceptionHandler != null) { t.setUncaughtExceptionHandler(exceptionHandler); } return t; }{code} // code8 {code:java} @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(false); t.setPriority(threadPriority); // optional handler for uncaught exceptions if (exceptionHandler != null) { t.setUncaughtExceptionHandler(exceptionHandler); }{code} return t; } What do you think? > At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job > archive file will upload fails > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-21274 > URL: https://issues.apache.org/jira/browse/FLINK-21274 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.10.1 > Reporter: Jichao Wang > Priority: Critical > Attachments: 1.png, 2.png, > application_1612404624605_0010-JobManager.log > > > This is a partial configuration of my Flink History service(flink-conf.yaml), > and this is also the configuration of my Flink client. > {code:java} > jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > {code} > I used {color:#0747a6}flink run -m yarn-cluster > /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a > WorkCount task to the Yarn cluster. Under normal circumstances, after the > task is completed, the flink job execution information will be archived to > HDFS, and then the JobManager process will exit. However, when this archiving > process takes a long time (maybe the HDFS write speed is slow), the task > archive file upload fails. > The specific reproduction method is as follows: > Modify the > {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color} > method to wait 5 seconds before actually writing to HDFS (simulating a slow > write speed scenario). > {code:java} > public static Path archiveJob(Path rootPath, JobID jobId, > Collection<ArchivedJson> jsonToArchive) > throws IOException { > try { > FileSystem fs = rootPath.getFileSystem(); > Path path = new Path(rootPath, jobId.toString()); > OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); > try { > LOG.info("===========================Wait 5 seconds.."); > Thread.sleep(5000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > try (JsonGenerator gen = jacksonFactory.createGenerator(out, > JsonEncoding.UTF8)) { > ... // Part of the code is omitted here > } catch (Exception e) { > fs.delete(path, false); > throw e; > } > LOG.info("Job {} has been archived at {}.", jobId, path); > return path; > } catch (IOException e) { > LOG.error("Failed to archive job.", e); > throw e; > } > } > {code} > After I make the above changes to the code, I cannot find the corresponding > task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png). > Then I went to Yarn to browse the JobManager log (see attachment > application_1612404624605_0010-JobManager.log for log details), and found > that the following logs are missing in the task log: > {code:java} > INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process > YarnJobClusterEntrypoint with exit code 0.{code} > Usually, if the task exits normally, a similar log will be printed before > executing {color:#0747a6}System.exit(returnCode){color}. > If no Exception information is found in the JobManager log, the above > situation occurs, indicating that the JobManager is running to a certain > point, and there is no user thread in the JobManager process, which causes > the program to exit without completing the normal process. > Eventually I found out that multiple services (for example: ioExecutor, > metricRegistry, commonRpcService) were exited asynchronously in > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color}, > and multiple services would be exited in the shutdown() method of > metricRegistry (for example : executor), these exit actions are executed > asynchronously and in parallel. If ioExecutor or executor exits last, it will > cause the above problems.The root cause is that the threads in these two > Executors are daemon threads, while the threads in Akka Actor are user > threads. > I hope to modify the following code to fix this bug. If it is determined that > this is a bug (this problem will affect all versions above 1.9), please > assign the ticket to me, thank you. > Only need to modify the > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color} > method: > After fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > int returnCode; > Throwable throwable = null; > try { > returnCode = > clusterEntrypoint.getTerminationFuture().get().processExitCode(); > } catch (Throwable e) { > throwable = e; > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } > LOG.info("Terminating cluster entrypoint process {} with exit code {}.", > clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }{code} > Before fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, > throwable) -> { > final int returnCode; > if (throwable != null) { > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } else { > returnCode = applicationStatus.processExitCode(); > } > LOG.info("Terminating cluster entrypoint process {} with exit code > {}.", clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }); > } > {code} > The purpose of the modification is to ensure that the Main thread exits last. -- This message was sent by Atlassian Jira (v8.3.4#803005)