Hello, We're running 1.9.2 on YARN, and are seeing some interesting behavior when submitting jobs in a multi-threaded fashion to an application's Flink cluster. The error we see reported in the client application logs is the following:
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) ... at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ... 3 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted. Looking through the JobManager logs, I see this interesting sequence for JobID 8e1c2fdd68feee100d8fee003efef3e2: 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:07:25,843 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobMaster - Initializing job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:07:26,109 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobMaster - Using restart strategy NoRestartStrategy for Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:07:28,705 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124. 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64] org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) under job master id 00000000000000000000000000000000. 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64] org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched from state CREATED to RUNNING. 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.yarn.YarnResourceManager - Registering job manager 00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124 for job 8e1c2fdd68feee100d8fee003efef3e2. 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.yarn.YarnResourceManager - Registered job manager 00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124 for job 8e1c2fdd68feee100d8fee003efef3e2<mailto:00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124%20for%20job%208e1c2fdd68feee100d8fee003efef3e2>. 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-64] org.apache.flink.yarn.YarnResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with allocation id 5bca3bde577f93169928e04749b45343. 2021-01-20 14:08:45,199 INFO [flink-akka.actor.default-dispatcher-30] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:09:19,981 INFO [flink-akka.actor.default-dispatcher-90] org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021(8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:09:19,982 INFO [flink-akka.actor.default-dispatcher-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched from state RUNNING to FAILING. It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the cluster somehow received the submission request twice? The client log only show a single submission for this job: 2021-01-20 14:01:55,775 [ProductHistory-18359] INFO RestClusterClient - Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false). So while the job is submitted a single time, the dispatcher somehow tries to perform two submissions resulting in a failure. How does this happen? ____________ Andreas Hailu Data Lake Engineering | Goldman Sachs & Co. ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>