[ https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann reassigned FLINK-21274: ------------------------------------- Assignee: Jichao Wang > At per-job mode, during the exit of the JobManager process, if ioExecutor > exits at the end, the System.exit() method will not be executed. > ------------------------------------------------------------------------------------------------------------------------------------------ > > Key: FLINK-21274 > URL: https://issues.apache.org/jira/browse/FLINK-21274 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.9.3, 1.10.1, 1.11.0, 1.12.0 > Reporter: Jichao Wang > Assignee: Jichao Wang > Priority: Critical > Attachments: 1.png, 2.png, Add wait 5 seconds in > org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait > 5 seconds.log, application_1612404624605_0010-JobManager.log > > > h2. =============Latest issue description(2021.02.07)================== > I want to try to describe the issue in a more concise way: > *My issue only appears in per-job mode,* > In JsonResponseHistoryServerArchivist#archiveExecutionGraph, submit the > archive task to ioExecutor for execution. At the same time, > ClusterEntrypoint#stopClusterServices exits multiple thread pools in parallel > (for example, commonRpcService, metricRegistry, > MetricRegistryImpl#executor(in metricRegistry.shutdown())). Think about it, > assuming that the archiving process takes 10 seconds to execute, then > ExecutorUtils.nonBlockingShutdown will wait 10 before exiting. However, > through testing, it was found that the JobManager process exited immediately > after commonRpcService and metricRegistry exited. At this time, > ExecutorUtils.nonBlockingShutdown is still waiting for the end of the > archiving process, so the archiving process will not be completely executed. > *There are two specific reproduction methods:* > *Method one:* > Modify the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method > to wait 5 seconds before actually writing to HDFS (simulating a slow write > speed scenario). > {code:java} > public static Path archiveJob(Path rootPath, JobID jobId, > Collection<ArchivedJson> jsonToArchive) > throws IOException { > try { > FileSystem fs = rootPath.getFileSystem(); > Path path = new Path(rootPath, jobId.toString()); > OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); > try { > LOG.info("===========================Wait 5 seconds.."); > Thread.sleep(5000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > try (JsonGenerator gen = jacksonFactory.createGenerator(out, > JsonEncoding.UTF8)) { > ... // Part of the code is omitted here > } catch (Exception e) { > fs.delete(path, false); > throw e; > } > LOG.info("Job {} has been archived at {}.", jobId, path); > return path; > } catch (IOException e) { > LOG.error("Failed to archive job.", e); > throw e; > } > } > {code} > The above modification will cause the archive to fail. > *Method two:* > In ClusterEntrypoint#stopClusterServices, before > ExecutorUtils.nonBlockingShutdown is called, submit a task that waits 10 > seconds to ioExecutor. > {code:java} > ioExecutor.execute(new Runnable() { > @Override > public void run() { > try { > LOG.info("===ioExecutor before sleep"); > Thread.sleep(10000); > LOG.info("===ioExecutor after sleep"); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > }); > terminationFutures.add(ExecutorUtils.nonBlockingShutdown(shutdownTimeout, > TimeUnit.MILLISECONDS, ioExecutor)); > {code} > According to the above modification, ===ioExecutor before sleep will be > printed, but ===ioExecutor after sleep will not be printed. > *The root cause of the above issue is that all user threads (in Akka > ActorSystem) have exited during the waiting, and finally the daemon thread > (in ioExecutor) cannot be executed completely.* > > {color:#de350b} *If you already understand my issue, you can skip the > following old version of the issue description, and browse the comment area > directly*{color} > > > > > > h2. ================Older issue description(2021.02.04)================ > This is a partial configuration of my Flink History service(flink-conf.yaml), > and this is also the configuration of my Flink client. > {code:java} > jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > {code} > I used {color:#0747a6}flink run -m yarn-cluster > /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a > WorkCount task to the Yarn cluster. Under normal circumstances, after the > task is completed, the flink job execution information will be archived to > HDFS, and then the JobManager process will exit. However, when this archiving > process takes a long time (maybe the HDFS write speed is slow), the task > archive file upload fails. > The specific reproduction method is as follows: > Modify the > {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color} > method to wait 5 seconds before actually writing to HDFS (simulating a slow > write speed scenario). > {code:java} > public static Path archiveJob(Path rootPath, JobID jobId, > Collection<ArchivedJson> jsonToArchive) > throws IOException { > try { > FileSystem fs = rootPath.getFileSystem(); > Path path = new Path(rootPath, jobId.toString()); > OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); > try { > LOG.info("===========================Wait 5 seconds.."); > Thread.sleep(5000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > try (JsonGenerator gen = jacksonFactory.createGenerator(out, > JsonEncoding.UTF8)) { > ... // Part of the code is omitted here > } catch (Exception e) { > fs.delete(path, false); > throw e; > } > LOG.info("Job {} has been archived at {}.", jobId, path); > return path; > } catch (IOException e) { > LOG.error("Failed to archive job.", e); > throw e; > } > } > {code} > After I make the above changes to the code, I cannot find the corresponding > task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png). > Then I went to Yarn to browse the JobManager log (see attachment > application_1612404624605_0010-JobManager.log for log details), and did not > found the following logs in the JobManager log file: > {code:java} > INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process > YarnJobClusterEntrypoint with exit code 0.{code} > Usually, if the task exits normally, a similar log will be printed before > executing {color:#0747a6}System.exit(returnCode){color}. > If no Exception information is found in the JobManager log, the above > situation occurs, indicating that the JobManager is running to a certain > point, and there is no user thread in the JobManager process, which causes > the program to exit without completing the normal process. > > > > Eventually I found out that multiple services (e.g. ioExecutor, > metricRegistry, commonRpcService) were exited asynchronously in > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color}, > and multiple services would be exited in the shutdown() method of > metricRegistry (e.g. executor), these exit actions are executed > asynchronously and in parallel. If ioExecutor or executor exits after > metricRegistry and commonRpcService , it will cause the above problems. Why > is there no such problem with ioExecutor being exited brefore metricRegistry > and commonRpcService? The key difference is that the threads in ioExecutor > are daemon threads, while the threads in metricRegistry and commonRpcService > are user threads. > > > > I hope to modify the following code to fix this bug. If it is determined that > this is a bug (this problem will affect all versions above 1.9), please > assign the ticket to me, thank you. > Only need to modify the > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color} > method: > After fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > int returnCode; > Throwable throwable = null; > try { > returnCode = > clusterEntrypoint.getTerminationFuture().get().processExitCode(); > } catch (Throwable e) { > throwable = e; > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } > LOG.info("Terminating cluster entrypoint process {} with exit code {}.", > clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }{code} > Before fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, > throwable) -> { > final int returnCode; > if (throwable != null) { > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } else { > returnCode = applicationStatus.processExitCode(); > } > LOG.info("Terminating cluster entrypoint process {} with exit code > {}.", clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }); > } > {code} > The purpose of the modification is to ensure that the Main thread exits last. -- This message was sent by Atlassian Jira (v8.3.4#803005)