[ 
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15532781#comment-15532781
 ] 

ASF GitHub Bot commented on FLINK-4406:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81134995
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -457,14 +505,94 @@ public Acknowledge 
updateTaskExecutionState(TaskExecutionState taskExecutionStat
                return Acknowledge.get();
        }
     
    -   /**
    -    * Triggers the registration of the job master at the resource manager.
    -    *
    -    * @param address Address of the resource manager
    -    */
    -   @RpcMethod
    -   public void registerAtResourceManager(final String address) {
    -           //TODO:: register at the RM
    +   
//----------------------------------------------------------------------------------------------


    +   // Internal methods

    +   // 
----------------------------------------------------------------------------------------------



    +
    +   private void handleFatalError(final Throwable cause) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
    +                           shutDown();
    +                           jobCompletionActions.onFatalError(cause);
    +                   }
    +           });
    +   }
    +
    +   private void notifyOfNewResourceManagerLeader(
    +           final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
    +   {
    +           // IMPORTANT: executed by main thread to avoid concurrence
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           if (resourceManagerConnection != null) {
    +                                   if (resourceManagerAddress != null) {
    +                                           if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
    +                                                   && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
    +                                           {
    +                                                   // both address and 
leader id are not changed, we can keep the old connection
    +                                                   return;
    +                                           }
    +                                           log.info("ResourceManager 
leader changed from {} to {}. Registering at new leader.",
    +                                                   
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
    +                                   }
    +                                   else {
    +                                           log.info("Current 
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
    +                                                   
resourceManagerConnection.getTargetAddress());
    +                                   }
    +                           }
    +
    +                           closeResourceManagerConnection();
    +
    +                           if (resourceManagerAddress != null) {
    +                                   log.info("Attempting to register at 
ResourceManager {}", resourceManagerAddress);
    +                                   resourceManagerConnection = new 
ResourceManagerConnection(
    +                                           log, jobGraph.getJobID(), 
leaderSessionID,
    +                                           resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
    +                                   resourceManagerConnection.start();
    +                           }
    +                   }
    +           });
    +   }
    +
    +   private void onResourceManagerRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
    +           // IMPORTANT: executed by main thread to avoid concurrence
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           // only process if we haven't been connected in 
the meantime
    +                           if (resourceManagerGateway == null) {
    +                                   // double check the connection is still 
effective
    +                                   if (resourceManagerConnection != null) {
    --- End diff --
    
    Couldn't this resource manager connection be replaced by a new one in the 
meantime?


> Implement job master registration at resource manager
> -----------------------------------------------------
>
>                 Key: FLINK-4406
>                 URL: https://issues.apache.org/jira/browse/FLINK-4406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Wenlong Lyu
>            Assignee: Kurt Young
>
> Job Master needs to register to Resource Manager when starting and then 
> watches leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to