[ https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744529#comment-17744529 ]
Chesnay Schepler commented on FLINK-32592: ------------------------------------------ master: 13d35365f677813d5f0090f121e14e8bdec646d1 1.17: TBD 1.16: TBD > (Stream)ExEnv#initializeContextEnvironment isn't thread-safe > ------------------------------------------------------------ > > Key: FLINK-32592 > URL: https://issues.apache.org/jira/browse/FLINK-32592 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission > Affects Versions: 1.15.4, 1.18.0, 1.17.1 > Reporter: Fabio Wanner > Assignee: Chesnay Schepler > Priority: Major > Labels: pull-request-available > > *Context* > We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a > single session cluster. The job submissions done by the operator happen > concurrently, basically at the same time. > Operator version: 1.5.0 > Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) > *Problem* > Rarely (~once every 50 deployments) one of the jobs will not be executed. In > the following incident 4 jobs are deployed at the same time: > * gorner-task-staging-e5730831 > * gorner-facility-staging-e5730831 > * gorner-aepp-staging-e5730831 > * gorner-session-staging-e5730831 > > The operator submits the job, they all get a reasonable jobID: > {code:java} > 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-task-staging-e5730831] Submitting job: > 4968b186061e44390000000000000002 to session cluster. > 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: > 91a5260d916c4dff0000000000000002 to session cluster. > 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: > 103c0446e14749a10000000000000002 to session cluster. > 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-session-staging-e5730831] Submitting job: > de59304d370b4b8e0000000000000002 to session cluster. > {code} > In the cluster the JarRunHandler's handleRequest() method will get the > request, all 4 jobIDs are present (also all args, etc are correct): > {code:java} > 2023-07-14 10:25:35,320 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 4968b186061e44390000000000000002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: de59304d370b4b8e0000000000000002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 91a5260d916c4dff0000000000000002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 103c0446e14749a10000000000000002 > {code} > But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is > called instead of getting 1 call per jobID we have 4 calls but one of the > jobIDs twice: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[4968b186061e44390000000000000002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[103c0446e14749a10000000000000002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0000000000000002] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0000000000000002] > {code} > If this is important: the jobGraph obtained does not match the jobID. We get > 2 times de59304d370b4b8e0000000000000002 but the jobgraph for this jobID is > never returned by getJobGraph() in > EmbeddedExecutor.submitAndGetJobClientFuture(). > This will then lead to the job already existing: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [de59304d370b4b8e0000000000000002] > {code} > But since the jobs are completely different the execution will fail. > Depending on the timing with one of the following exceptions: > * RestHandlerException: No jobs included in application > * ClassNotFoundException: > io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor > -- This message was sent by Atlassian Jira (v8.20.10#820010)