Hi,

We've got a problem trying to set up two flink clusters using the same 
zookeeper instance that we wonder if anyone has seen before or has any advice 
on.

Our setup is two AWS EMR clusters running flink (v1.7.2) that are both trying 
to use a single zookeeper cluster (v3.4.6-1569965) for their HA configuration. 
As a cost saving measure, we have the clusters configured to terminate at 19:00 
and restart at 08:00 each day.

Cluster 1 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 6 -tm 9472 -jm 2048 -s 8

high-availability: zookeeper
high-availability.cluster-id: /cluster1
high-availability.storageDir: s3://xxxx/flink/cluster1/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Cluster 2 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 2 -tm 6144 -jm 6144 -s 1

high-availability: zookeeper
high-availability.cluster-id: /cluster2
high-availability.storageDir: s3://xxxx/flink/cluster2/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Since upgrading to flink 1.7.2 from 1.6.1, we've found that whichever cluster 
happens to start second fails to start with the following error:


2019-03-07 10:59:05,142 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 10:59:05,166 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 10:59:05,171 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,184 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,187 INFO  org.apache.flink.runtime.rest.RestClient          
            - Shutting down rest endpoint.
2019-03-07 11:00:05,194 INFO  org.apache.flink.runtime.rest.RestClient          
            - Rest endpoint shutdown complete.
2019-03-07 11:00:05,194 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 11:00:05,195 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,195 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - backgroundOperationsLoop exiting
2019-03-07 11:00:05,200 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
0x6b691520c6774910 closed
2019-03-07 11:00:05,200 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
EventThread shut down for session: 0x6b691520c6774910
2019-03-07 11:00:05,408 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killed 
application application_1551946032263_0005
2019-03-07 11:00:05,488 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli     
            - Error while running the Flink Yarn session.
org.apache.flink.util.FlinkException: Could not write the Yarn connection 
information.
        at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:636)
        at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:810)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:810)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
Could not retrieve the leader address and leader session ID.
        at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:134)
        at 
org.apache.flink.client.program.rest.RestClusterClient.getClusterConnectionInfo(RestClusterClient.java:513)
        at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:613)
        ... 6 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[60000 milliseconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:259)
        at scala.concurrent.Await$.$anonfun$result$1(package.scala:215)
        at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:142)
        at scala.concurrent.Await.result(package.scala)
        at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:132)
        ... 8 more

Further attempts to start the cluster fail in the same way, and the only 
solution seems to be to clear out the HA information in zookeeper (and the 
filesystem). After doing this, the cluster starts successfully without running 
any jobs.
Cluster 2 usually starts up first and succeeds, but we've seen it both ways 
around. We've also tried swapping the values of cluster-id and path.root for 
each cluster, which has the same problem.


The problem appears to be that it can't find the leader/dispatcher. Turning the 
logging up to DEBUG, I can see some suggestion in the jobmanager log that this 
election has completed successfully:
2019/03/07 10:59:02,781 DEBUG 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Grant 
leadership to contender 
akka.tcp://fl...@ip-10-100-30-114.eu-west-1.compute.internal:44315/user/dispatcher
 with session ID fd044f52-b05c-4feb-9ccf-4c8f18ddf18c.
2019/03/07 10:59:08,035 DEBUG 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher 
akka.tcp://fl...@ip-10-100-30-114.eu-west-1.compute.internal:44315/user/dispatcher
 accepted leadership with fencing token 9ccf4c8f18ddf18cfd044f52b05c4feb. Start 
recovered jobs.

There don't seem to be any errors in the jobmanager log.

Any advice on how to start these two clusters or suggestions for other avenues 
to solve or debug would be really gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accuracy or completeness 
unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: 
https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
Social Housing Limited (company no: 01026007), British Gas Trading Limited 
(company no: 03078711), British Gas Services Limited (company no: 3141243), 
British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
Limited (company no: 06723244), British Gas Services (Commercial) Limited 
(company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
3033654). Each company is registered in England and Wales with a registered 
office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation 
Authority and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority. British Gas Services Limited and Centrica Energy 
(Trading) Limited are authorised and regulated by the Financial Conduct 
Authority. British Gas Trading Limited is an appointed representative of 
British Gas Services Limited which is authorised and regulated by the Financial 
Conduct Authority.

Reply via email to