Hi Dongwon, Please find the answer inline.
> why CLI accessed ZK? This is a good question. Currently, when the HA is enabled, even though we could get the JobManager rest endpoint from Yarn application report, we still have to retrieve the leader information from ZooKeeper. Please find more information in the class *RestClusterClient*. I am not aware of any potential issues if we directly retrieve rest endpoint from Yarn application report. And I think this could be a minor improvement. > Specify the "high-availability.cluster-id" to list jobs I have created a ticket for updating the documentation[1]. > About the "-m yarn-cluster" You are right. "--target yarn-per-job" is the recommended way to start a perjob cluster. The backend cli option parser is *GenericCLI*. It is also used for application mode and K8s deployment. "-m yarn-cluster" is the old way. All the cli options are parsed by FlinkYarnSessionCli. Since it is widely used, it could not be deprecated or removed very soon. "--executor" has the exactly same effect with "--target". The only different is the naming. [1]. https://issues.apache.org/jira/browse/FLINK-20866 Best, Yang Dongwon Kim <eastcirc...@gmail.com> 于2021年1月6日周三 下午6:49写道: > Hi Yang, > > I was wondering why CLI accessed ZK because, as shown in the following > lines, CLI seemed to know the address of JM by contacting AHS before > connecting to ZK. > > 2021-01-06 18:35:32,351 INFO org.apache.flink.client.cli.CliFrontend > [] - Running 'list' command. > > 2021-01-06 18:35:32,682 INFO org.apache.hadoop.yarn.client.AHSProxy > [] - Connecting to Application History server at > mobdata-devflink-nm02.dakao.io/10.93.0.91:10200 > > 2021-01-06 18:35:32,763 INFO org.apache.flink.yarn.YarnClusterDescriptor > [] - Found Web Interface > mobdata-devflink-dn03.dakao.io:37098 of application > 'application_1600163418174_0127'. > > 2021-01-06 18:35:32,773 INFO org.apache.flink.runtime.util.ZooKeeperUtils > [] - Enforcing default ACL for ZK connections > > 2021-01-06 18:35:32,774 INFO org.apache.flink.runtime.util.ZooKeeperUtils > [] - Using '/driving-habits/default' as Zookeeper > namespace. > Anyway CLI needs to know where the leader (=active) JM is located via a ZK > node and GenericCLI has to be informed of high-availability.cluster-id. > Thanks for the heads up! > > > You could also specify the "high-availability.cluster-id" so that leader >> retrieval could get the correct JobManager address. >> *flink list --target yarn-per-job -Dyarn.application.id >> <http://dyarn.application.id/>=$application_id >> -Dhigh-availability.cluster-id=$application_id* > > Okay, it checked that it works. Thank you very much :-) It will be nice > for other users if your answer is also explained on [1]. > > > And the following >> command should work with/without ZooKeeper HA configured. >> *./bin/flink list -m yarn-cluster -yid $applicationId* > > I'm very confused as there's different ways to specify YARN per-job > clusters: > - "--target yarn-per-job" is explained in the current documentation [1] > and it looks like the most recent one, so I'd rather use this one with > "-Dhigh-availability.cluster-id=$application_id" > - Is "--jobmanater yarn-per-job" a preferred way of specifying per-job > clusters and is it going to co-exist with "--target yarn-per-job" for the > future releases? but It looks old-school to me. > - There's also "--executor yarn-per-job" which seems to be deprecated soon > (explained in "flink help") > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode > > Best, > > Dongwon > > On Wed, Jan 6, 2021 at 12:33 PM Yang Wang <danrtsey...@gmail.com> wrote: > >> Hi Dongwon, >> >> I think the root cause is that GenericCLI do not override the >> "high-availability.cluster-id" with specified application id. >> The GenericCLI is activated by "--target yarn-per-job". In >> the FlinkYarnSessionCli, we have done this. And the following >> command should work with/without ZooKeeper HA configured. >> >> >> *./bin/flink list -m yarn-cluster -yid $applicationId* >> >> You could also specify the "high-availability.cluster-id" so that leader >> retrieval could get the correct JobManager address. >> >> >> *flink list --target yarn-per-job -Dyarn.application.id >> <http://Dyarn.application.id>=$application_id >> -Dhigh-availability.cluster-id=$application_id* >> >> BTW, this is not a new introduced behavior change in Flink 1.12. I >> believe it also could not work in 1.11 and 1.10. >> >> >> Best, >> Yang >> >> >> Dongwon Kim <eastcirc...@gmail.com> 于2021年1月5日周二 下午11:22写道: >> >>> Hi, >>> >>> I'm using Flink-1.12.0 and running on Hadoop YARN. >>> >>> After setting HA-related properties in flink-conf.yaml, >>> >>> high-availability: zookeeper >>> >>> high-availability.zookeeper.path.root: /recovery >>> >>> high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181 >>> >>> high-availability.storageDir: hdfs:///flink/recovery >>> >>> the following command hangs and fails: >>> >>> $ flink list --target yarn-per-job -Dyarn.application.id=$application_id >>> >>> Before setting the properties, I can see the following lines after >>> executing the above command: >>> >>> 2021-01-06 00:11:48,961 INFO >>> org.apache.flink.runtime.security.modules.HadoopModule >>> [] - Hadoop user set to deploy (auth:SIMPLE) >>> >>> 2021-01-06 00:11:48,968 INFO >>> org.apache.flink.runtime.security.modules.JaasModule >>> [] - Jaas file will be created as >>> /tmp/jaas-8522045433029410483.conf. >>> >>> 2021-01-06 00:11:48,976 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Running 'list' command. >>> >>> 2021-01-06 00:11:49,316 INFO org.apache.hadoop.yarn.client.AHSProxy >>> [] - Connecting to Application History server at >>> nm02/10.93.0.91:10200 >>> >>> 2021-01-06 00:11:49,324 INFO >>> org.apache.flink.yarn.YarnClusterDescriptor [] - No >>> path for the flink jar passed. Using the location of class >>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar >>> >>> 2021-01-06 00:11:49,333 WARN >>> org.apache.flink.yarn.YarnClusterDescriptor [] - >>> Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is >>> set.The Flink YARN Client needs one of these to be set to properly load the >>> Hadoop configuration for accessing YARN. >>> >>> 2021-01-06 00:11:49,404 INFO >>> org.apache.flink.yarn.YarnClusterDescriptor [] - Found >>> Web Interface dn03:37098 of application 'application_1600163418174_0127'. >>> >>> 2021-01-06 00:11:49,758 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Waiting for response... >>> >>> Waiting for response... >>> >>> 2021-01-06 00:11:49,863 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Successfully retrieved list of jobs >>> >>> ------------------ Running/Restarting Jobs ------------------- >>> >>> 31.12.2020 01:22:34 : 76fc265c44ef44ae343ab15868155de6 : stream >>> calculator (RUNNING) >>> >>> -------------------------------------------------------------- >>> >>> No scheduled jobs. >>> >>> After: >>> >>> 2021-01-06 00:06:38,971 INFO >>> org.apache.flink.runtime.security.modules.HadoopModule >>> [] - Hadoop user set to deploy (auth:SIMPLE) >>> >>> 2021-01-06 00:06:38,976 INFO >>> org.apache.flink.runtime.security.modules.JaasModule >>> [] - Jaas file will be created as >>> /tmp/jaas-3613274701724362777.conf. >>> >>> 2021-01-06 00:06:38,982 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Running 'list' command. >>> >>> 2021-01-06 00:06:39,304 INFO org.apache.hadoop.yarn.client.AHSProxy >>> [] - Connecting to Application History server at >>> nm02/10.93.0.91:10200 >>> >>> 2021-01-06 00:06:39,312 INFO >>> org.apache.flink.yarn.YarnClusterDescriptor [] - No >>> path for the flink jar passed. Using the location of class >>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar >>> >>> 2021-01-06 00:06:39,320 WARN >>> org.apache.flink.yarn.YarnClusterDescriptor [] - >>> Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is >>> set.The Flink YARN Client needs one of these to be set to properly load the >>> Hadoop configuration for accessing YARN. >>> >>> 2021-01-06 00:06:39,388 INFO >>> org.apache.flink.yarn.YarnClusterDescriptor [] - Found >>> Web Interface dn03:37098 of application 'application_1600163418174_0127'. >>> >>> 2021-01-06 00:06:39,399 INFO org.apache.flink.runtime.util.ZooKeeperUtils >>> [] - Enforcing default ACL for ZK connections >>> >>> 2021-01-06 00:06:39,399 INFO org.apache.flink.runtime.util.ZooKeeperUtils >>> [] - Using '/recovery/default' as Zookeeper namespace. >>> >>> 2021-01-06 00:06:39,425 INFO >>> org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility >>> [] - Running in ZooKeeper 3.4.x compatibility mode >>> >>> 2021-01-06 00:06:39,425 INFO >>> org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility >>> [] - Using emulated InjectSessionExpiration >>> >>> 2021-01-06 00:06:39,447 INFO >>> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl >>> [] - Starting >>> >>> 2021-01-06 00:06:39,455 INFO >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper >>> [] - Initiating client connection, connectString=nm01:2181, >>> >>> nm02:2181,nm03:2181 sessionTimeout=60000 >>> watcher=org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState@7668d560 >>> >>> 2021-01-06 00:06:39,466 INFO >>> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl >>> [] - Default schema >>> >>> 2021-01-06 00:06:39,466 WARN >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>> [] - SASL configuration failed: javax.security.auth.login.LoginException: >>> No JAAS configuration section named 'Client' was found in specified JAAS >>> configuration file: '/tmp/jaas-3613274701724362777.conf'. Will continue >>> connection to Zookeeper server without SASL authentication, if Zookeeper >>> server allows it. >>> >>> 2021-01-06 00:06:39,467 INFO >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>> [] - Opening socket connection to server nm01/10.93.0.32:2181 >>> >>> 2021-01-06 00:06:39,467 INFO >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>> [] - Socket connection established to nm01/10.93.0.32:2181, initiating >>> session >>> >>> 2021-01-06 00:06:39,467 ERROR >>> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - >>> Authentication failed >>> >>> 2021-01-06 00:06:39,477 INFO >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>> [] - Session establishment complete on server nm01/10.93.0.32:2181, >>> sessionid = 0x176d1f2c2280016, negotiated timeout = 60000 >>> >>> 2021-01-06 00:06:39,478 INFO >>> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager >>> [] - State change: CONNECTED >>> >>> 2021-01-06 00:06:39,658 INFO >>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService >>> [] - Starting DefaultLeaderRetrievalService with >>> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}. >>> >>> 2021-01-06 00:06:39,667 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Waiting for response... >>> >>> Waiting for response... >>> >>> >>> # here it took almost 30 seconds >>> >>> >>> 2021-01-06 00:07:09,670 INFO >>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService >>> [] - Stopping DefaultLeaderRetrievalService. >>> >>> 2021-01-06 00:07:09,670 INFO >>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver >>> [] - Closing >>> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}. >>> >>> 2021-01-06 00:07:09,671 INFO >>> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl >>> [] - backgroundOperationsLoop exiting >>> >>> 2021-01-06 00:07:09,679 INFO >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper >>> [] - Session: 0x176d1f2c2280016 closed >>> >>> 2021-01-06 00:07:09,679 INFO >>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>> [] - EventThread shut down for session: 0x176d1f2c2280016 >>> >>> 2021-01-06 00:07:09,680 ERROR org.apache.flink.client.cli.CliFrontend >>> [] - Error while running the command. >>> >>> org.apache.flink.util.FlinkException: Failed to retrieve job list. >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:436) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:418) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:415) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:977) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at java.security.AccessController.doPrivileged(Native Method) >>> ~[?:1.8.0_222] >>> >>> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_222] >>> >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) >>> [hadoop-common-3.1.1.3.1.4.0-315.jar:?] >>> >>> at >>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) >>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> Caused by: java.util.concurrent.TimeoutException >>> >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) >>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>> >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> ~[?:1.8.0_222] >>> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> ~[?:1.8.0_222] >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> ~[?:1.8.0_222] >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> ~[?:1.8.0_222] >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> ~[?:1.8.0_222] >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> ~[?:1.8.0_222] >>> >>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222] >>> >>> >>> ------------------------------------------------------------ >>> >>> The program finished with the following exception: >>> >>> >>> org.apache.flink.util.FlinkException: Failed to retrieve job list. >>> >>> at org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:436) >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:418) >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) >>> >>> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:415) >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:977) >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) >>> >>> at java.security.AccessController.doPrivileged(Native Method) >>> >>> at javax.security.auth.Subject.doAs(Subject.java:422) >>> >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) >>> >>> at >>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>> >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) >>> >>> Caused by: java.util.concurrent.TimeoutException >>> >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) >>> >>> at >>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>> >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) >>> >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> >>> at java.lang.Thread.run(Thread.java:748) >>> >>> Why is the zookeeper specified for HA used in this process? >>> >>> No way to avoid such behavior? >>> >>> Best, >>> >>> Dongwon >>> >>> >>> >>>