[ https://issues.apache.org/jira/browse/FLINK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449260#comment-15449260 ]
ASF GitHub Bot commented on FLINK-4516: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2427#discussion_r76813885 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) { return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } + + /** + * Callback method when current resourceManager is granted leadership + * + * @param newLeaderSessionID unique leadershipID + */ + void handleGrantLeadership(final UUID newLeaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID); + leaderSessionID = newLeaderSessionID; + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + } + }); + } + + /** + * Callback method when current resourceManager lose leadership. + */ + void handleRevokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + leaderSessionID = null; + } + }); + } + + /** + * Callback method when an error happened to current resourceManager on leader election + * @param e + */ + void onLeaderElectionError(final Throwable e) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("ResourceManager received an error from the LeaderElectionService.", e); + // terminate ResourceManager in case of an error + shutDown(); + } + }); + } + + private class ResourceManagerLeaderContender implements LeaderContender { + + @Override + public void grantLeadership(UUID leaderSessionID) { + handleGrantLeadership(leaderSessionID); + } + + @Override + public void revokeLeadership() { + handleRevokeLeadership(); + } + + @Override + public String getAddress() { + return getAddress(); --- End diff -- This is a recursive call which will lead to a StackOverflow. > ResourceManager leadership election > ----------------------------------- > > Key: FLINK-4516 > URL: https://issues.apache.org/jira/browse/FLINK-4516 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: zhangjing > Assignee: zhangjing > > 1. When a resourceManager is started, it starts the leadership election > service first and take part in contending for leadership > 2. Every resourceManager contains a ResourceManagerLeaderContender, when it > is granted leadership, it will start SlotManager and other main components. > when it is revoked leadership, it will stop all its components and clear > everything. -- This message was sent by Atlassian JIRA (v6.3.4#6332)