[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15431931#comment-15431931 ]
ASF GitHub Bot commented on FLINK-4348: --------------------------------------- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2389#discussion_r75789279 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -52,18 +61,45 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { private final ExecutionContext executionContext; private final Map<JobMasterGateway, InstanceID> jobMasterGateways; + private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways; + private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers; + private final LeaderElectionService leaderElectionService; + private UUID leaderSessionID; + // TODO private final SlotManager slotManager; - public ResourceManager(RpcService rpcService, ExecutorService executorService) { + public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) { super(rpcService); this.executionContext = ExecutionContext$.MODULE$.fromExecutor( Preconditions.checkNotNull(executorService)); this.jobMasterGateways = new HashMap<>(); + this.taskExecutorGateways = new HashMap<>(); + this.heartbeatSchedulers = new HashMap<>(); + this.leaderElectionService = leaderElectionService; + leaderSessionID = null; + // TODO this.slotManager = null; + } + + @Override + public void start() { + // start a leader + try { + leaderElectionService.start(new ResourceManagerLeaderContender()); + super.start(); + } catch (Exception e) { + log.error("a fatal error happened when start resourceManager", e); --- End diff -- ok > Implement communication from ResourceManager to TaskManager > ----------------------------------------------------------- > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Kurt Young > Assignee: zhangjing > > There are mainly 3 logics initiated from RM to TM: > * Heartbeat, RM use heartbeat to sync with TM's slot status > * SlotRequest, when RM decides to assign slot to JM, should first try to send > request to TM for slot. TM can either accept or reject this request. > * FailureNotify, in some corner cases, TM will be marked as invalid by > cluster manager master(e.g. yarn master), but TM itself does not realize. RM > should send failure notify to TM and TM can terminate itself -- This message was sent by Atlassian JIRA (v6.3.4#6332)