[ https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279485#comment-17279485 ]
Jichao Wang edited comment on FLINK-21274 at 2/5/21, 9:11 AM: -------------------------------------------------------------- [~fly_in_gis] I think FLINK-21008 is that while the JobManager is exiting, the external device sends a forced exit signal to the JobManager. May result in {{stopClusterServices}} and {{cleanupDirectories}} not being executed. But my problem is that in the absence of an external interrupt signal, the task in ioExecutor (for example: archiving job info) cannot be completely executed during program exit. The scenario I gave in this ticket is that the task is executed normally. At this time, if the archiving speed is too slow, the archiving will not be completed. was (Author: wjc920): I think FLINK-21008 is that while the JobManager is exiting, the external device sends a forced exit signal to the JobManager. May result in {{stopClusterServices}} and {{cleanupDirectories}} not being executed. But my problem is that in the absence of an external interrupt signal, the task in ioExecutor (for example: archiving job info) cannot be completely executed during program exit. The scenario I gave in this ticket is that the task is executed normally. At this time, if the archiving speed is too slow, the archiving will not be completed. > At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job > archive file will upload fails > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-21274 > URL: https://issues.apache.org/jira/browse/FLINK-21274 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.10.1 > Reporter: Jichao Wang > Priority: Critical > Attachments: 1.png, 2.png, Add wait 5 seconds in > org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait > 5 seconds.log, application_1612404624605_0010-JobManager.log > > > This is a partial configuration of my Flink History service(flink-conf.yaml), > and this is also the configuration of my Flink client. > {code:java} > jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > {code} > I used {color:#0747a6}flink run -m yarn-cluster > /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a > WorkCount task to the Yarn cluster. Under normal circumstances, after the > task is completed, the flink job execution information will be archived to > HDFS, and then the JobManager process will exit. However, when this archiving > process takes a long time (maybe the HDFS write speed is slow), the task > archive file upload fails. > The specific reproduction method is as follows: > Modify the > {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color} > method to wait 5 seconds before actually writing to HDFS (simulating a slow > write speed scenario). > {code:java} > public static Path archiveJob(Path rootPath, JobID jobId, > Collection<ArchivedJson> jsonToArchive) > throws IOException { > try { > FileSystem fs = rootPath.getFileSystem(); > Path path = new Path(rootPath, jobId.toString()); > OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); > try { > LOG.info("===========================Wait 5 seconds.."); > Thread.sleep(5000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > try (JsonGenerator gen = jacksonFactory.createGenerator(out, > JsonEncoding.UTF8)) { > ... // Part of the code is omitted here > } catch (Exception e) { > fs.delete(path, false); > throw e; > } > LOG.info("Job {} has been archived at {}.", jobId, path); > return path; > } catch (IOException e) { > LOG.error("Failed to archive job.", e); > throw e; > } > } > {code} > After I make the above changes to the code, I cannot find the corresponding > task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png). > Then I went to Yarn to browse the JobManager log (see attachment > application_1612404624605_0010-JobManager.log for log details), and found > that the following logs are missing in the task log: > {code:java} > INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process > YarnJobClusterEntrypoint with exit code 0.{code} > Usually, if the task exits normally, a similar log will be printed before > executing {color:#0747a6}System.exit(returnCode){color}. > If no Exception information is found in the JobManager log, the above > situation occurs, indicating that the JobManager is running to a certain > point, and there is no user thread in the JobManager process, which causes > the program to exit without completing the normal process. > Eventually I found out that multiple services (for example: ioExecutor, > metricRegistry, commonRpcService) were exited asynchronously in > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color}, > and multiple services would be exited in the shutdown() method of > metricRegistry (for example : executor), these exit actions are executed > asynchronously and in parallel. If ioExecutor or executor exits last, it will > cause the above problems.The root cause is that the threads in these two > Executors are daemon threads, while the threads in Akka Actor are user > threads. > I hope to modify the following code to fix this bug. If it is determined that > this is a bug (this problem will affect all versions above 1.9), please > assign the ticket to me, thank you. > Only need to modify the > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color} > method: > After fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > int returnCode; > Throwable throwable = null; > try { > returnCode = > clusterEntrypoint.getTerminationFuture().get().processExitCode(); > } catch (Throwable e) { > throwable = e; > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } > LOG.info("Terminating cluster entrypoint process {} with exit code {}.", > clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }{code} > Before fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, > throwable) -> { > final int returnCode; > if (throwable != null) { > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } else { > returnCode = applicationStatus.processExitCode(); > } > LOG.info("Terminating cluster entrypoint process {} with exit code > {}.", clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }); > } > {code} > The purpose of the modification is to ensure that the Main thread exits last. -- This message was sent by Atlassian Jira (v8.3.4#803005)