Hello,

We're running 1.9.2 on YARN, and are seeing some interesting behavior when 
submitting jobs in a multi-threaded fashion to an application's Flink cluster. 
The error we see reported in the client application logs is the following:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2)
           at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
           at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
           at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
           at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
           at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
           at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
           at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
           at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
           at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
           at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
           at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
           at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
           at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
           ... 3 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Job has already been 
submitted.

Looking through the JobManager logs, I see this interesting sequence for JobID 
8e1c2fdd68feee100d8fee003efef3e2:

2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).
2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 
8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 
2021).
2021-01-20 14:07:25,843 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing 
job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:26,109 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart 
strategy NoRestartStrategy for Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:28,705 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Running 
initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager 
runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id 
00000000-0000-0000-0000-000000000000 at 
akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124.
2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Starting 
execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2) under job master id 
00000000000000000000000000000000.
2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java 
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched 
from state CREATED to RUNNING.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-2] 
org.apache.flink.yarn.YarnResourceManager                     - Registering job 
manager 
00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124
 for job 8e1c2fdd68feee100d8fee003efef3e2.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-2] 
org.apache.flink.yarn.YarnResourceManager                     - Registered job 
manager 
00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124
 for job 
8e1c2fdd68feee100d8fee003efef3e2<mailto:00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124%20for%20job%208e1c2fdd68feee100d8fee003efef3e2>.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.yarn.YarnResourceManager                     - Request slot 
with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, 
managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with allocation 
id 5bca3bde577f93169928e04749b45343.
2021-01-20 14:08:45,199 INFO  [flink-akka.actor.default-dispatcher-30] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).
2021-01-20 14:09:19,981 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the 
JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 
2021(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:09:19,982 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java 
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched 
from state RUNNING to FAILING.

It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the 
cluster somehow received the submission request twice? The client log only show 
a single submission for this job:

2021-01-20 14:01:55,775 [ProductHistory-18359] INFO  RestClusterClient - 
Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false).

So while the job is submitted a single time, the dispatcher somehow tries to 
perform two submissions resulting in a failure. How does this happen?

____________

Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.


________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Reply via email to