Hi Andreas, the problem here is that the command you're using is starting a per-job cluster (which is obvious from the used deployment method " YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster` flag is deprecated and no longer supported, I think this is something we should completely remove in the near future. Also this was always supposed to start your job in per-job mode, but unfortunately in older versions this was kind of simulated using session cluster, so I'd say it has just worked by an accident (a.k.a "undocumented bug / feature").
What you really want to do is to start a session cluster upfront and than use a `yarn-session` deployment target (where you need to provide yarn application id so Flink can search for the active JobManager). This is well documented in the yarn section of the docs <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode> [1]. Can you please try this approach a let me know if that helped? [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode Best, D. On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] < andreas.ha...@gs.com> wrote: > Hi David, > > > > You’re correct about classpathing problems – thanks for your help in > spotting them. I was able to get past that exception by removing some > conflicting packages in my shaded JAR, but I’m seeing something else that’s > interesting. With the 2 threads trying to submit jobs, one of the threads > is able submit and process data successfully, while the other thread fails. > > > > Log snippet: > > 2021-08-16 13:43:12,893 [thread-1] INFO YarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=4096, > taskManagerMemoryMB=18432, slotsPerTaskManager=2} > > 2021-08-16 13:43:12,893 [thread-2] INFO YarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=4096, > taskManagerMemoryMB=18432, slotsPerTaskManager=2} > > 2021-08-16 13:43:12,897 [thread-2] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-16 13:43:12,897 [thread-1] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-16 13:43:13,104 [thread-2] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-16 13:43:13,104 [thread-1] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-16 13:43:20,475 [thread-1] INFO YarnClusterDescriptor - Adding > delegation token to the AM container. > > 2021-08-16 13:43:20,488 [thread-1] INFO DFSClient - Created > HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536 > > 2021-08-16 13:43:20,512 [thread-1] INFO TokenCache - Got dt for > hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, > Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp) > > 2021-08-16 13:43:20,513 [thread-1] INFO Utils - Attempting to obtain > Kerberos security token for HBase > > 2021-08-16 13:43:20,513 [thread-1] INFO Utils - HBase is not available > (not packaged with this application): ClassNotFoundException : > "org.apache.hadoop.hbase.HBaseConfiguration". > > 2021-08-16 13:43:20,564 [thread-2] WARN YarnClusterDescriptor - Add job > graph to local resource fail. > > 2021-08-16 13:43:20,570 [thread-1] INFO YarnClusterDescriptor - > Submitting application master application_1628992879699_11275 > > 2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running > data flow for thread-2 > > org.apache.flink.client.deployment.ClusterDeploymentException: Could not > deploy Yarn job cluster. > > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) > > at > org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) > > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973) > > at > org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124) > > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72) > > at > com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49) > > ... > > Caused by: java.io.IOException: Filesystem closed > > at > org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826) > > at > org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152) > > ... > > 2021-08-16 13:43:20,979 [thread-1] INFO TimelineClientImpl - Timeline > service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/ > > 2021-08-16 13:43:21,377 [thread-1] INFO YarnClientImpl - Submitted > application application_1628992879699_11275 > > 2021-08-16 13:43:21,377 [thread-1] INFO YarnClusterDescriptor - Waiting > for the cluster to be allocated > > 2021-08-16 13:43:21,379 [thread-1] INFO YarnClusterDescriptor - Deploying > cluster, current state ACCEPTED > > 2021-08-16 13:43:28,435 [thread-1] INFO YarnClusterDescriptor - YARN > application has been deployed successfully. > > 2021-08-16 13:43:28,436 [thread-1] INFO YarnClusterDescriptor - Found Web > Interface d279536-023.dc.gs.com:41019 of application > 'application_1628992879699_11275'. > > 2021-08-16 13:43:28,443 [thread-1] INFO AbstractJobClusterExecutor - Job > has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 > > Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 > > 2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO FlinkJobSubmitter$2 > - job completed for thread-2 with parallelism 1 > > Program execution finished > > Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished. > > > > I’ve generated and sent you a signup link to our firm’s secure > document-sharing app called Lockbox. In the repository, I’ve uploaded both > our full client and YARN app logs (named half_failure-client_log and > half_failure-yarn-log, respectively) in a directory named Flink support > logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to > have a look at see if you can see if there’s something we’re doing that’s > clearly wrong? > > > > Something I did notice is that with the upgrade, our submissions are now > using the introduction of this ContextEnvironment#executeAsync method. If > it means anything, our client doesn’t require asynchronous job submission. > > *// *ah > > > > *From:* David Morávek <d...@apache.org> > *Sent:* Monday, August 16, 2021 6:28 AM > *To:* Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Upgrading from Flink on YARN 1.9 to 1.11 > > > > Hi Andreas, > > > > Per-job and session deployment modes should not be affected by this FLIP. > Application mode is just a new deployment mode (where job driver runs > embedded within JM), that co-exists with these two. > > > > From information you've provided, I'd say your actual problem is this > exception: > > > > ``` > > Caused by: java.lang.ExceptionInInitializerError > > at > com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) > > at > com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) > > at > com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) > > at com.sun.jersey.api.client.Client.init(Client.java:342) > > at com.sun.jersey.api.client.Client.access$000(Client.java:118) > > at com.sun.jersey.api.client.Client$1.f(Client.java:191) > > at com.sun.jersey.api.client.Client$1.f(Client.java:187) > > at > com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) > > at com.sun.jersey.api.client.Client.<init>(Client.java:187) > > at com.sun.jersey.api.client.Client.<init>(Client.java:170) > > at > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285) > ``` > > > > I've seen this exception a few times with Hadoop already and it's usually > a dependency / class-path problem. If you google for this you'll find many > references. > > > > Best, > > D. > > > > > > On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] < > andreas.ha...@gs.com> wrote: > > Hello folks! > > > > We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on > YARN and each have their own clusters, with each application having > multiple jobs submitted. > > > > Our current submission command looks like this: > > $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name > -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar > -application-args-go-here > > > > The behavior observed in versions <= 1.9 is the following: > > 1. A Flink cluster gets deployed to YARN > > 2. Our application code is run, building graphs and submitting jobs > > > > When we rebuilt and submit using 1.11.2, we now observe the following: > > 1. Our application code is run, building graph and submitting jobs > > 2. A Flink cluster gets deployed to YARN once execute() is invoked > > > > I presume that this is a result of FLIP-85 [1] ? > > > > This change in behavior proves to be a problem for us as our application > is multi-threaded, and each thread submits its own job to the Flink > cluster. What we see is the first thread to peexecute() submits a job to > YARN, and others fail with a ClusterDeploymentException. > > > > 2021-08-13 14:47:42,299 [flink-thread-#1] INFO YarnClusterDescriptor - > Cluster specification: ClusterSpecification{masterMemoryMB=4096, > taskManagerMemoryMB=18432, slotsPerTaskManager=2} > > 2021-08-13 14:47:42,299 [flink-thread-#2] INFO YarnClusterDescriptor - > Cluster specification: ClusterSpecification{masterMemoryMB=4096, > taskManagerMemoryMB=18432, slotsPerTaskManager=2} > > 2021-08-13 14:47:42,304 [flink-thread-#1] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-13 14:47:42,304 [flink-thread-#2] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > Listening for transport dt_socket at address: 5005 > > 2021-08-13 14:47:46,716 [flink-thread-#2] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-13 14:47:46,716 [flink-thread-#1] WARN PluginConfig - The plugins > directory [plugins] does not exist. > > 2021-08-13 14:47:54,820 [flink-thread-#1] INFO YarnClusterDescriptor - > Adding delegation token to the AM container. > > 2021-08-13 14:47:54,837 [flink-thread-#1] INFO DFSClient - Created > HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536 > > 2021-08-13 14:47:54,860 [flink-thread-#1] INFO TokenCache - Got dt for > hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, > Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user) > > 2021-08-13 14:47:54,860 [flink-thread-#1] INFO Utils - Attempting to > obtain Kerberos security token for HBase > > 2021-08-13 14:47:54,861 [flink-thread-#1] INFO Utils - HBase is not > available (not packaged with this application): ClassNotFoundException : > "org.apache.hadoop.hbase.HBaseConfiguration". > > 2021-08-13 14:47:54,901 [flink-thread-#1] INFO YarnClusterDescriptor - > Submitting application master application_1628393898291_71530 > > 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception > running data flow for flink-thread-#2 > > org.apache.flink.client.deployment.ClusterDeploymentException: Could not > deploy Yarn job cluster. > > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) > > at > org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) > > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973) > > at > org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124) > > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72) > > ... > > Caused by: java.io.IOException: Filesystem closed > > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826) > > at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152) > > at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138) > > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919) > > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114) > > ... > > Caused by: java.lang.ExceptionInInitializerError > > at > com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) > > at > com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) > > at > com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) > > at com.sun.jersey.api.client.Client.init(Client.java:342) > > at com.sun.jersey.api.client.Client.access$000(Client.java:118) > > at com.sun.jersey.api.client.Client$1.f(Client.java:191) > > at com.sun.jersey.api.client.Client$1.f(Client.java:187) > > at > com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) > > at com.sun.jersey.api.client.Client.<init>(Client.java:187) > > at com.sun.jersey.api.client.Client.<init>(Client.java:170) > > at > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285) > > > > Is the only solution here to move to application mode [2]? Doing so would > imply a migration requirement (which may have its own set of problems.) > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode > <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=> > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode > <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=> > > > > Best, > > Andreas > > > ------------------------------ > > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices > > > ------------------------------ > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices >