Hi Robert, I appreciate you having a look. I’ll have a closer look and see what I can find. Thanks!
// ah From: Robert Metzger <rmetz...@apache.org> Sent: Friday, January 22, 2021 2:41 AM To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com> Cc: user@flink.apache.org Subject: Re: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted Hey Andreas, thanks a lot for providing me with the full logs. The JobManager actually received 2 job submissions. There are 2 relevant log messages. 1. "Received JobGraph submission xxx" 2. "Submitting job" 1 is logged right after the dispatcher receives the JobGraph, before the duplicate submission check is done. 2 is logged once we know that there is no duplicate job. We have the following log messages (which you actually posted in here on the list already) TYPE 1: 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). TYPE 2: 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). TYPE 1: 2021-01-20 14:08:45,199 INFO [flink-akka.actor.default-dispatcher-30] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:09:19,981 INFO [flink-akka.actor.default-dispatcher-90] org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021(8e1c2fdd68feee100d8fee003efef3e2). So at 14:06, you are submitting the job, at 14:09 it fails. In between (14:08) you are trying to submit the job again, which gets (rightfully) rejected. It seems that the second submission didn't get logged properly in your client. I don't think there's a bug on Flink's side of things. On Thu, Jan 21, 2021 at 7:17 PM Hailu, Andreas [Engineering] <andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote: Hi Robert, I sent you an email with instructions to create an account to view the logs through our secure repository. I’ve included the JobManager and client application logs there. We have a thread pool that we use to submit multiple jobs in parallel, but in there there’s no retry logic – if any one job fails, it’s an overall failure for the entire application. In regards to the timespan between when the job was logged to have been initially submitted from the client app logs and when the JobManager logs it as being received – we’re submitting a large number of jobs as a part of this application. Is it possible that it’s busy processing other jobs? // ah From: Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>> Sent: Thursday, January 21, 2021 10:00 AM To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com<mailto:andreas.ha...@ny.email.gs.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted Thanks a lot for your message. Why is there a difference of 5 minutes between the timestamp of the job submission from the client to the timestamp on the JobManager where the submission is received? Is there any service / custom logic involved in the job submission? (e.g. a proxy in between, that has some retry mechanism, or some custom code that does retries?) Could you provide the full JobManager logs of that timeframe, not just those messages filtered for 8e1c2fdd68feee100d8fee003efef3e2? On Wed, Jan 20, 2021 at 10:20 PM Hailu, Andreas [Engineering] <andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote: Hello, We’re running 1.9.2 on YARN, and are seeing some interesting behavior when submitting jobs in a multi-threaded fashion to an application’s Flink cluster. The error we see reported in the client application logs is the following: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) ... at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ... 3 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted. Looking through the JobManager logs, I see this interesting sequence for JobID 8e1c2fdd68feee100d8fee003efef3e2: 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:07:25,843 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobMaster - Initializing job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:07:26,109 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobMaster - Using restart strategy NoRestartStrategy for Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:07:28,705 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-16] org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124<http://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124>. 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64] org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) under job master id 00000000000000000000000000000000. 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64] org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched from state CREATED to RUNNING. 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.yarn.YarnResourceManager - Registering job manager 00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124<http://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124> for job 8e1c2fdd68feee100d8fee003efef3e2. 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.yarn.YarnResourceManager - Registered job manager 00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124 for job 8e1c2fdd68feee100d8fee003efef3e2<mailto:00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124%20for%20job%208e1c2fdd68feee100d8fee003efef3e2>. 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-64] org.apache.flink.yarn.YarnResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with allocation id 5bca3bde577f93169928e04749b45343. 2021-01-20 14:08:45,199 INFO [flink-akka.actor.default-dispatcher-30] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:09:19,981 INFO [flink-akka.actor.default-dispatcher-90] org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021(8e1c2fdd68feee100d8fee003efef3e2). 2021-01-20 14:09:19,982 INFO [flink-akka.actor.default-dispatcher-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched from state RUNNING to FAILING. It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the cluster somehow received the submission request twice? The client log only show a single submission for this job: 2021-01-20 14:01:55,775 [ProductHistory-18359] INFO RestClusterClient - Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false). So while the job is submitted a single time, the dispatcher somehow tries to perform two submissions resulting in a failure. How does this happen? ____________ Andreas Hailu Data Lake Engineering | Goldman Sachs & Co. ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices> ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices> ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>