Hi Andreas,

the problem here is that the command you're using is starting a per-job
cluster (which is obvious from the used deployment method "
YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster`
flag is deprecated and no longer supported, I think this is something we
should completely remove in the near future. Also this was always supposed
to start your job in per-job mode, but unfortunately in older versions this
was kind of simulated using session cluster, so I'd say it has just worked
by an accident (a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than
use a `yarn-session` deployment target (where you need to provide yarn
application id so Flink can search for the active JobManager). This is well
documented in the yarn section of the docs
<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode>
[1].

Can you please try this approach a let me know if that helped?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hi David,
>
>
>
> You’re correct about classpathing problems – thanks for your help in
> spotting them. I was able to get past that exception by removing some
> conflicting packages in my shaded JAR, but I’m seeing something else that’s
> interesting. With the 2 threads trying to submit jobs, one of the threads
> is able submit and process data successfully, while the other thread fails.
>
>
>
> Log snippet:
>
> 2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding
> delegation token to the AM container.
>
> 2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
>
> 2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp)
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain
> Kerberos security token for HBase
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available
> (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job
> graph to local resource fail.
>
> 2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628992879699_11275
>
> 2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running
> data flow for thread-2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
>             at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
>             at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
>             at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
>             at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
>             at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
>             at
> com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
>
> ...
>
> Caused by: java.io.IOException: Filesystem closed
>
>             at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
>             at
> org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
> ...
>
> 2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline
> service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted
> application application_1628992879699_11275
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting
> for the cluster to be allocated
>
> 2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying
> cluster, current state ACCEPTED
>
> 2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN
> application has been deployed successfully.
>
> 2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web
> Interface d279536-023.dc.gs.com:41019 of application
> 'application_1628992879699_11275'.
>
> 2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job
> has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
>
> Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
>
> 2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2
> - job completed for thread-2 with parallelism 1
>
> Program execution finished
>
> Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.
>
>
>
> I’ve generated and sent you a signup link to our firm’s secure
> document-sharing app called Lockbox. In the repository, I’ve uploaded both
> our full client and YARN app logs (named half_failure-client_log and
> half_failure-yarn-log, respectively) in a directory named Flink support
> logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to
> have a look at see if you can see if there’s something we’re doing that’s
> clearly wrong?
>
>
>
> Something I did notice is that with the upgrade, our submissions are now
> using the introduction of this ContextEnvironment#executeAsync method. If
> it means anything, our client doesn’t require asynchronous job submission.
>
> *// *ah
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Monday, August 16, 2021 6:28 AM
> *To:* Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Upgrading from Flink on YARN 1.9 to 1.11
>
>
>
> Hi Andreas,
>
>
>
> Per-job and session deployment modes should not be affected by this FLIP.
> Application mode is just a new deployment mode (where job driver runs
> embedded within JM), that co-exists with these two.
>
>
>
> From information you've provided, I'd say your actual problem is this
> exception:
>
>
>
> ```
>
> Caused by: java.lang.ExceptionInInitializerError
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>
>         at com.sun.jersey.api.client.Client.init(Client.java:342)
>
>         at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>
>         at
> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>
>         at com.sun.jersey.api.client.Client.<init>(Client.java:187)
>
>        at com.sun.jersey.api.client.Client.<init>(Client.java:170)
>
>         at
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
> ```
>
>
>
> I've seen this exception a few times with Hadoop already and it's usually
> a dependency / class-path problem. If you google for this you'll find many
> references.
>
>
>
> Best,
>
> D.
>
>
>
>
>
> On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <
> andreas.ha...@gs.com> wrote:
>
> Hello folks!
>
>
>
> We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on
> YARN and each have their own clusters, with each application having
> multiple jobs submitted.
>
>
>
> Our current submission command looks like this:
>
> $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name
> -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar
> -application-args-go-here
>
>
>
> The behavior observed in versions <= 1.9 is the following:
>
> 1.     A Flink cluster gets deployed to YARN
>
> 2.     Our application code is run, building graphs and submitting jobs
>
>
>
> When we rebuilt and submit using 1.11.2, we now observe the following:
>
> 1.     Our application code is run, building graph and submitting jobs
>
> 2.     A Flink cluster gets deployed to YARN once execute() is invoked
>
>
>
> I presume that this is a result of FLIP-85 [1] ?
>
>
>
> This change in behavior proves to be a problem for us as our application
> is multi-threaded, and each thread submits its own job to the Flink
> cluster. What we see is the first thread to peexecute() submits a job to
> YARN, and others fail with a ClusterDeploymentException.
>
>
>
> 2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> Listening for transport dt_socket at address: 5005
>
> 2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Adding delegation token to the AM container.
>
> 2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to
> obtain Kerberos security token for HBase
>
> 2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not
> available (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628393898291_71530
>
> 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception
> running data flow for flink-thread-#2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
>         at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
>         at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
>         at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
>         at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
>         ...
>
> Caused by: java.io.IOException: Filesystem closed
>
>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
>         at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
>         at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
>
>         ...
>
> Caused by: java.lang.ExceptionInInitializerError
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>
>         at com.sun.jersey.api.client.Client.init(Client.java:342)
>
>         at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>
>         at
> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>
>         at com.sun.jersey.api.client.Client.<init>(Client.java:187)
>
>        at com.sun.jersey.api.client.Client.<init>(Client.java:170)
>
>         at
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
>
>
>
> Is the only solution here to move to application mode [2]? Doing so would
> imply a migration requirement (which may have its own set of problems.)
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=>
>
>
>
> Best,
>
> Andreas
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

Reply via email to