[ 
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jichao Wang updated FLINK-21274:
--------------------------------
    Description: 
This is a partial configuration of my Flink History service(flink-conf.yaml).
{code:java}
jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
{code}
I used {color:#0747a6}flink run -m yarn-cluster 
/cloud/service/flink/examples/batch/WordCount.jar{color} to submit a WorkCount 
task to the Yarn cluster. Under normal circumstances, after the task is 
completed, the flink job execution information will be archived to HDFS, and 
then the JobManager process will exit. However, when this archiving process 
takes a long time (maybe the HDFS write speed is slow), the task archive file 
upload fails.

The specific reproduction method is as follows:

Modify the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method to wait 5 seconds before actually writing to HDFS (simulating a slow 
write speed scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
After I make the above changes to the code, I cannot find the corresponding 
task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).

Then I went to Yarn to browse the JobManager log (see attachment 
application_1612404624605_0010-JobManager.log for log details), and found that 
the following logs are missing in the task log:
{code:java}
INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
YarnJobClusterEntrypoint with exit code 0.{code}
Usually, if the task exits normally, a similar log will be printed before 
executing {color:#0747a6}System.exit(returnCode){color}.

If no Exception information is found in the JobManager log, the above situation 
occurs, indicating that the JobManager is running to a certain point, and there 
is no user thread in the JobManager process, which causes the program to exit 
without completing the normal process.

Eventually I found out that multiple services (for example: ioExecutor, 
metricRegistry, commonRpcService) were exited asynchronously in 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
 and multiple services would be exited in the shutdown() method of 
metricRegistry (for example : executor), these exit actions are executed 
asynchronously and in parallel. If ioExecutor or executor exits last, it will 
cause the above problems.

I hope to modify the following code to solve this problem. If it is determined 
that this is a problem (this problem will affect all versions above 1.9), 
please assign the ticket to me, thank you.

Only need to modify the 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
 method:
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
   try {
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }

   int returnCode;
   Throwable throwable = null;
   try {
      returnCode = 
clusterEntrypoint.getTerminationFuture().get().processExitCode();
   } catch (Throwable e) {
      throwable = e;
      returnCode = RUNTIME_FAILURE_RETURN_CODE;
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
}{code}
 
 The purpose of the modification is to ensure that the Main thread exits last.

  was:
This is a partial configuration of my Flink History service(flink-conf.yaml).
{code:java}
jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
{code}
I used {color:#0747a6}flink run -m yarn-cluster 
/cloud/service/flink/examples/batch/WordCount.jar{color} to submit a WorkCount 
task to the Yarn cluster. Under normal circumstances, after the task is 
completed, the flink job execution information will be archived to HDFS, and 
then the JobManager process will exit. However, when this archiving process 
takes a long time (maybe the HDFS write speed is slow), the task archive file 
upload fails.

The specific reproduction method is as follows:

Modify the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method to wait 5 seconds before actually writing to HDFS (simulating a slow 
write speed scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
After I make the above changes to the code, I cannot find the corresponding 
task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).

Then I went to Yarn to browse the JobManager log (see attachment 
application_1612404624605_0010-JobManager.log for log details), and found that 
the following logs are missing in the task log:
{code:java}
INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
YarnJobClusterEntrypoint with exit code 0.{code}
Usually, if the task exits normally, a similar log will be printed before 
executing {color:#0747a6}System.exit(returnCode){color}.

If no Exception information is found in the JobManager log, the above situation 
occurs, indicating that the JobManager is running to a certain point, and there 
is no user thread in the JobManager process, which causes the program to exit 
without completing the normal process.

Eventually I found out that multiple services (for example: ioExecutor, 
metricRegistry, commonRpcService) were exited asynchronously in 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
 and multiple services would be exited in the shutdown() method of 
metricRegistry (for example : executor), these exit actions are executed 
asynchronously and in parallel. If ioExecutor or executor exits last, it will 
cause the above problems.

I hope to modify the following code to solve this problem. If it is determined 
that this is a problem (this problem will affect all versions above 1.9), 
please assign the ticket to me, thank you.

Only need to modify the 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
 method:
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

 final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
 try {
 clusterEntrypoint.startCluster();
 } catch (ClusterEntrypointException e) {
 LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
 System.exit(STARTUP_FAILURE_RETURN_CODE);
 }

 int returnCode;
 Throwable throwable = null;
 try {
 returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
 } catch (Throwable e) {
 throwable = e;
 returnCode = RUNTIME_FAILURE_RETURN_CODE;
 }

 LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
 System.exit(returnCode);
}{code}
 
 The purpose of the modification is to ensure that the Main thread exits last.


> At per-job mode,when the upload speed of the HDFS file system is slow, the 
> task exits abnormally, and the task archive file upload fails.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21274
>                 URL: https://issues.apache.org/jira/browse/FLINK-21274
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.1
>            Reporter: Jichao Wang
>            Priority: Critical
>         Attachments: 1.png, 2.png, 
> application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml).
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster 
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a 
> WorkCount task to the Yarn cluster. Under normal circumstances, after the 
> task is completed, the flink job execution information will be archived to 
> HDFS, and then the JobManager process will exit. However, when this archiving 
> process takes a long time (maybe the HDFS write speed is slow), the task 
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the 
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
>  method to wait 5 seconds before actually writing to HDFS (simulating a slow 
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive) 
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding 
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment 
> application_1612404624605_0010-JobManager.log for log details), and found 
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before 
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above 
> situation occurs, indicating that the JobManager is running to a certain 
> point, and there is no user thread in the JobManager process, which causes 
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor, 
> metricRegistry, commonRpcService) were exited asynchronously in 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
>  and multiple services would be exited in the shutdown() method of 
> metricRegistry (for example : executor), these exit actions are executed 
> asynchronously and in parallel. If ioExecutor or executor exits last, it will 
> cause the above problems.
> I hope to modify the following code to solve this problem. If it is 
> determined that this is a problem (this problem will affect all versions 
> above 1.9), please assign the ticket to me, thank you.
> Only need to modify the 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
>  method:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    int returnCode;
>    Throwable throwable = null;
>    try {
>       returnCode = 
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
>    } catch (Throwable e) {
>       throwable = e;
>       returnCode = RUNTIME_FAILURE_RETURN_CODE;
>    }
>    LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
> clusterEntrypointName, returnCode, throwable);
>    System.exit(returnCode);
> }{code}
>  
>  The purpose of the modification is to ensure that the Main thread exits last.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to