[ https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15532781#comment-15532781 ]
ASF GitHub Bot commented on FLINK-4406: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2565#discussion_r81134995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -457,14 +505,94 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat return Acknowledge.get(); } - /** - * Triggers the registration of the job master at the resource manager. - * - * @param address Address of the resource manager - */ - @RpcMethod - public void registerAtResourceManager(final String address) { - //TODO:: register at the RM + //---------------------------------------------------------------------------------------------- + // Internal methods + // ---------------------------------------------------------------------------------------------- + + private void handleFatalError(final Throwable cause) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); + shutDown(); + jobCompletionActions.onFatalError(cause); + } + }); + } + + private void notifyOfNewResourceManagerLeader( + final String resourceManagerAddress, final UUID resourceManagerLeaderId) + { + // IMPORTANT: executed by main thread to avoid concurrence + runAsync(new Runnable() { + @Override + public void run() { + if (resourceManagerConnection != null) { + if (resourceManagerAddress != null) { + if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) + && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) + { + // both address and leader id are not changed, we can keep the old connection + return; + } + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + } + else { + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getTargetAddress()); + } + } + + closeResourceManagerConnection(); + + if (resourceManagerAddress != null) { + log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); + resourceManagerConnection = new ResourceManagerConnection( + log, jobGraph.getJobID(), leaderSessionID, + resourceManagerAddress, resourceManagerLeaderId, executionContext); + resourceManagerConnection.start(); + } + } + }); + } + + private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) { + // IMPORTANT: executed by main thread to avoid concurrence + runAsync(new Runnable() { + @Override + public void run() { + // only process if we haven't been connected in the meantime + if (resourceManagerGateway == null) { + // double check the connection is still effective + if (resourceManagerConnection != null) { --- End diff -- Couldn't this resource manager connection be replaced by a new one in the meantime? > Implement job master registration at resource manager > ----------------------------------------------------- > > Key: FLINK-4406 > URL: https://issues.apache.org/jira/browse/FLINK-4406 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Wenlong Lyu > Assignee: Kurt Young > > Job Master needs to register to Resource Manager when starting and then > watches leadership changes of RM, and trigger re-registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)