Thanks Yang for the very detailed explanation! Wow, I really appreciate it.
Best, Dongwon On Wed, Jan 6, 2021 at 10:17 PM Yang Wang <danrtsey...@gmail.com> wrote: > Hi Dongwon, > > Please find the answer inline. > > > why CLI accessed ZK? > This is a good question. Currently, when the HA is enabled, even though we > could get the JobManager rest endpoint from Yarn application report, we > still have to retrieve the leader information from ZooKeeper. Please find > more information in the class *RestClusterClient*. I am not aware of any > potential issues if we directly retrieve rest endpoint from Yarn > application report. And I think this could be a minor improvement. > > > Specify the "high-availability.cluster-id" to list jobs > I have created a ticket for updating the documentation[1]. > > > About the "-m yarn-cluster" > You are right. "--target yarn-per-job" is the recommended way to start a > perjob cluster. The backend cli option parser is *GenericCLI*. It is also > used for application mode and K8s deployment. "-m yarn-cluster" is the old > way. All the cli options are parsed by FlinkYarnSessionCli. Since it is > widely used, it could not be deprecated or removed very soon. "--executor" > has the exactly same effect with "--target". The only different is the > naming. > > [1]. https://issues.apache.org/jira/browse/FLINK-20866 > > Best, > Yang > > > Dongwon Kim <eastcirc...@gmail.com> 于2021年1月6日周三 下午6:49写道: > >> Hi Yang, >> >> I was wondering why CLI accessed ZK because, as shown in the following >> lines, CLI seemed to know the address of JM by contacting AHS before >> connecting to ZK. >> >> 2021-01-06 18:35:32,351 INFO org.apache.flink.client.cli.CliFrontend >> [] - Running 'list' command. >> >> 2021-01-06 18:35:32,682 INFO org.apache.hadoop.yarn.client.AHSProxy >> [] - Connecting to Application History server at >> mobdata-devflink-nm02.dakao.io/10.93.0.91:10200 >> >> 2021-01-06 18:35:32,763 INFO org.apache.flink.yarn.YarnClusterDescriptor >> [] - Found Web Interface >> mobdata-devflink-dn03.dakao.io:37098 of application >> 'application_1600163418174_0127'. >> >> 2021-01-06 18:35:32,773 INFO org.apache.flink.runtime.util.ZooKeeperUtils >> [] - Enforcing default ACL for ZK connections >> >> 2021-01-06 18:35:32,774 INFO org.apache.flink.runtime.util.ZooKeeperUtils >> [] - Using '/driving-habits/default' as Zookeeper >> namespace. >> Anyway CLI needs to know where the leader (=active) JM is located via a >> ZK node and GenericCLI has to be informed of high-availability.cluster-id. >> Thanks for the heads up! >> >> >> You could also specify the "high-availability.cluster-id" so that leader >>> retrieval could get the correct JobManager address. >>> *flink list --target yarn-per-job -Dyarn.application.id >>> <http://dyarn.application.id/>=$application_id >>> -Dhigh-availability.cluster-id=$application_id* >> >> Okay, it checked that it works. Thank you very much :-) It will be nice >> for other users if your answer is also explained on [1]. >> >> >> And the following >>> command should work with/without ZooKeeper HA configured. >>> *./bin/flink list -m yarn-cluster -yid $applicationId* >> >> I'm very confused as there's different ways to specify YARN per-job >> clusters: >> - "--target yarn-per-job" is explained in the current documentation [1] >> and it looks like the most recent one, so I'd rather use this one with >> "-Dhigh-availability.cluster-id=$application_id" >> - Is "--jobmanater yarn-per-job" a preferred way of specifying per-job >> clusters and is it going to co-exist with "--target yarn-per-job" for the >> future releases? but It looks old-school to me. >> - There's also "--executor yarn-per-job" which seems to be deprecated >> soon (explained in "flink help") >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode >> >> Best, >> >> Dongwon >> >> On Wed, Jan 6, 2021 at 12:33 PM Yang Wang <danrtsey...@gmail.com> wrote: >> >>> Hi Dongwon, >>> >>> I think the root cause is that GenericCLI do not override the >>> "high-availability.cluster-id" with specified application id. >>> The GenericCLI is activated by "--target yarn-per-job". In >>> the FlinkYarnSessionCli, we have done this. And the following >>> command should work with/without ZooKeeper HA configured. >>> >>> >>> *./bin/flink list -m yarn-cluster -yid $applicationId* >>> >>> You could also specify the "high-availability.cluster-id" so that leader >>> retrieval could get the correct JobManager address. >>> >>> >>> *flink list --target yarn-per-job -Dyarn.application.id >>> <http://Dyarn.application.id>=$application_id >>> -Dhigh-availability.cluster-id=$application_id* >>> >>> BTW, this is not a new introduced behavior change in Flink 1.12. I >>> believe it also could not work in 1.11 and 1.10. >>> >>> >>> Best, >>> Yang >>> >>> >>> Dongwon Kim <eastcirc...@gmail.com> 于2021年1月5日周二 下午11:22写道: >>> >>>> Hi, >>>> >>>> I'm using Flink-1.12.0 and running on Hadoop YARN. >>>> >>>> After setting HA-related properties in flink-conf.yaml, >>>> >>>> high-availability: zookeeper >>>> >>>> high-availability.zookeeper.path.root: /recovery >>>> >>>> high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181 >>>> >>>> high-availability.storageDir: hdfs:///flink/recovery >>>> >>>> the following command hangs and fails: >>>> >>>> $ flink list --target yarn-per-job -Dyarn.application.id=$ >>>> application_id >>>> >>>> Before setting the properties, I can see the following lines after >>>> executing the above command: >>>> >>>> 2021-01-06 00:11:48,961 INFO >>>> org.apache.flink.runtime.security.modules.HadoopModule >>>> [] - Hadoop user set to deploy (auth:SIMPLE) >>>> >>>> 2021-01-06 00:11:48,968 INFO >>>> org.apache.flink.runtime.security.modules.JaasModule >>>> [] - Jaas file will be created as >>>> /tmp/jaas-8522045433029410483.conf. >>>> >>>> 2021-01-06 00:11:48,976 INFO org.apache.flink.client.cli.CliFrontend >>>> [] - Running 'list' command. >>>> >>>> 2021-01-06 00:11:49,316 INFO org.apache.hadoop.yarn.client.AHSProxy >>>> [] - Connecting to Application History server at >>>> nm02/10.93.0.91:10200 >>>> >>>> 2021-01-06 00:11:49,324 INFO >>>> org.apache.flink.yarn.YarnClusterDescriptor [] - No >>>> path for the flink jar passed. Using the location of class >>>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar >>>> >>>> 2021-01-06 00:11:49,333 WARN >>>> org.apache.flink.yarn.YarnClusterDescriptor [] - >>>> Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is >>>> set.The Flink YARN Client needs one of these to be set to properly load the >>>> Hadoop configuration for accessing YARN. >>>> >>>> 2021-01-06 00:11:49,404 INFO >>>> org.apache.flink.yarn.YarnClusterDescriptor [] - >>>> Found Web Interface dn03:37098 of application >>>> 'application_1600163418174_0127'. >>>> >>>> 2021-01-06 00:11:49,758 INFO org.apache.flink.client.cli.CliFrontend >>>> [] - Waiting for response... >>>> >>>> Waiting for response... >>>> >>>> 2021-01-06 00:11:49,863 INFO org.apache.flink.client.cli.CliFrontend >>>> [] - Successfully retrieved list of jobs >>>> >>>> ------------------ Running/Restarting Jobs ------------------- >>>> >>>> 31.12.2020 01:22:34 : 76fc265c44ef44ae343ab15868155de6 : stream >>>> calculator (RUNNING) >>>> >>>> -------------------------------------------------------------- >>>> >>>> No scheduled jobs. >>>> >>>> After: >>>> >>>> 2021-01-06 00:06:38,971 INFO >>>> org.apache.flink.runtime.security.modules.HadoopModule >>>> [] - Hadoop user set to deploy (auth:SIMPLE) >>>> >>>> 2021-01-06 00:06:38,976 INFO >>>> org.apache.flink.runtime.security.modules.JaasModule >>>> [] - Jaas file will be created as >>>> /tmp/jaas-3613274701724362777.conf. >>>> >>>> 2021-01-06 00:06:38,982 INFO org.apache.flink.client.cli.CliFrontend >>>> [] - Running 'list' command. >>>> >>>> 2021-01-06 00:06:39,304 INFO org.apache.hadoop.yarn.client.AHSProxy >>>> [] - Connecting to Application History server at >>>> nm02/10.93.0.91:10200 >>>> >>>> 2021-01-06 00:06:39,312 INFO >>>> org.apache.flink.yarn.YarnClusterDescriptor [] - No >>>> path for the flink jar passed. Using the location of class >>>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar >>>> >>>> 2021-01-06 00:06:39,320 WARN >>>> org.apache.flink.yarn.YarnClusterDescriptor [] - >>>> Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is >>>> set.The Flink YARN Client needs one of these to be set to properly load the >>>> Hadoop configuration for accessing YARN. >>>> >>>> 2021-01-06 00:06:39,388 INFO >>>> org.apache.flink.yarn.YarnClusterDescriptor [] - >>>> Found Web Interface dn03:37098 of application >>>> 'application_1600163418174_0127'. >>>> >>>> 2021-01-06 00:06:39,399 INFO org.apache.flink.runtime.util.ZooKeeperUtils >>>> [] - Enforcing default ACL for ZK connections >>>> >>>> 2021-01-06 00:06:39,399 INFO org.apache.flink.runtime.util.ZooKeeperUtils >>>> [] - Using '/recovery/default' as Zookeeper namespace. >>>> >>>> 2021-01-06 00:06:39,425 INFO >>>> org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility >>>> [] - Running in ZooKeeper 3.4.x compatibility mode >>>> >>>> 2021-01-06 00:06:39,425 INFO >>>> org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility >>>> [] - Using emulated InjectSessionExpiration >>>> >>>> 2021-01-06 00:06:39,447 INFO >>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl >>>> [] - Starting >>>> >>>> 2021-01-06 00:06:39,455 INFO >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper >>>> [] - Initiating client connection, connectString=nm01:2181, >>>> >>>> nm02:2181,nm03:2181 sessionTimeout=60000 >>>> watcher=org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState@7668d560 >>>> >>>> 2021-01-06 00:06:39,466 INFO >>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl >>>> [] - Default schema >>>> >>>> 2021-01-06 00:06:39,466 WARN >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>>> [] - SASL configuration failed: javax.security.auth.login.LoginException: >>>> No JAAS configuration section named 'Client' was found in specified JAAS >>>> configuration file: '/tmp/jaas-3613274701724362777.conf'. Will continue >>>> connection to Zookeeper server without SASL authentication, if Zookeeper >>>> server allows it. >>>> >>>> 2021-01-06 00:06:39,467 INFO >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>>> [] - Opening socket connection to server nm01/10.93.0.32:2181 >>>> >>>> 2021-01-06 00:06:39,467 INFO >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>>> [] - Socket connection established to nm01/10.93.0.32:2181, initiating >>>> session >>>> >>>> 2021-01-06 00:06:39,467 ERROR >>>> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - >>>> Authentication failed >>>> >>>> 2021-01-06 00:06:39,477 INFO >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>>> [] - Session establishment complete on server nm01/10.93.0.32:2181, >>>> sessionid = 0x176d1f2c2280016, negotiated timeout = 60000 >>>> >>>> 2021-01-06 00:06:39,478 INFO >>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager >>>> [] - State change: CONNECTED >>>> >>>> 2021-01-06 00:06:39,658 INFO >>>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService >>>> [] - Starting DefaultLeaderRetrievalService with >>>> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}. >>>> >>>> 2021-01-06 00:06:39,667 INFO org.apache.flink.client.cli.CliFrontend >>>> [] - Waiting for response... >>>> >>>> Waiting for response... >>>> >>>> >>>> # here it took almost 30 seconds >>>> >>>> >>>> 2021-01-06 00:07:09,670 INFO >>>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService >>>> [] - Stopping DefaultLeaderRetrievalService. >>>> >>>> 2021-01-06 00:07:09,670 INFO >>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver >>>> [] - Closing >>>> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}. >>>> >>>> 2021-01-06 00:07:09,671 INFO >>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl >>>> [] - backgroundOperationsLoop exiting >>>> >>>> 2021-01-06 00:07:09,679 INFO >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper >>>> [] - Session: 0x176d1f2c2280016 closed >>>> >>>> 2021-01-06 00:07:09,679 INFO >>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn >>>> [] - EventThread shut down for session: 0x176d1f2c2280016 >>>> >>>> 2021-01-06 00:07:09,680 ERROR org.apache.flink.client.cli.CliFrontend >>>> [] - Error while running the command. >>>> >>>> org.apache.flink.util.FlinkException: Failed to retrieve job list. >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:436) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:418) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:415) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:977) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> ~[?:1.8.0_222] >>>> >>>> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_222] >>>> >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) >>>> [hadoop-common-3.1.1.3.1.4.0-315.jar:?] >>>> >>>> at >>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) >>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> Caused by: java.util.concurrent.TimeoutException >>>> >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) >>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>> >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> ~[?:1.8.0_222] >>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> ~[?:1.8.0_222] >>>> >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> ~[?:1.8.0_222] >>>> >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> ~[?:1.8.0_222] >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> ~[?:1.8.0_222] >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> ~[?:1.8.0_222] >>>> >>>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222] >>>> >>>> >>>> ------------------------------------------------------------ >>>> >>>> The program finished with the following exception: >>>> >>>> >>>> org.apache.flink.util.FlinkException: Failed to retrieve job list. >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:436) >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:418) >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) >>>> >>>> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:415) >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:977) >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) >>>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) >>>> >>>> at >>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) >>>> >>>> Caused by: java.util.concurrent.TimeoutException >>>> >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) >>>> >>>> at >>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>>> >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) >>>> >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> Why is the zookeeper specified for HA used in this process? >>>> >>>> No way to avoid such behavior? >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>> >>>> >>>>