Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81134995
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -457,14 +505,94 @@ public Acknowledge 
updateTaskExecutionState(TaskExecutionState taskExecutionStat
                return Acknowledge.get();
        }
     
    -   /**
    -    * Triggers the registration of the job master at the resource manager.
    -    *
    -    * @param address Address of the resource manager
    -    */
    -   @RpcMethod
    -   public void registerAtResourceManager(final String address) {
    -           //TODO:: register at the RM
    +   
//----------------------------------------------------------------------------------------------

    +   // Internal methods

    +   // 
----------------------------------------------------------------------------------------------


    +
    +   private void handleFatalError(final Throwable cause) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
    +                           shutDown();
    +                           jobCompletionActions.onFatalError(cause);
    +                   }
    +           });
    +   }
    +
    +   private void notifyOfNewResourceManagerLeader(
    +           final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
    +   {
    +           // IMPORTANT: executed by main thread to avoid concurrence
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           if (resourceManagerConnection != null) {
    +                                   if (resourceManagerAddress != null) {
    +                                           if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
    +                                                   && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
    +                                           {
    +                                                   // both address and 
leader id are not changed, we can keep the old connection
    +                                                   return;
    +                                           }
    +                                           log.info("ResourceManager 
leader changed from {} to {}. Registering at new leader.",
    +                                                   
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
    +                                   }
    +                                   else {
    +                                           log.info("Current 
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
    +                                                   
resourceManagerConnection.getTargetAddress());
    +                                   }
    +                           }
    +
    +                           closeResourceManagerConnection();
    +
    +                           if (resourceManagerAddress != null) {
    +                                   log.info("Attempting to register at 
ResourceManager {}", resourceManagerAddress);
    +                                   resourceManagerConnection = new 
ResourceManagerConnection(
    +                                           log, jobGraph.getJobID(), 
leaderSessionID,
    +                                           resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
    +                                   resourceManagerConnection.start();
    +                           }
    +                   }
    +           });
    +   }
    +
    +   private void onResourceManagerRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
    +           // IMPORTANT: executed by main thread to avoid concurrence
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           // only process if we haven't been connected in 
the meantime
    +                           if (resourceManagerGateway == null) {
    +                                   // double check the connection is still 
effective
    +                                   if (resourceManagerConnection != null) {
    --- End diff --
    
    Couldn't this resource manager connection be replaced by a new one in the 
meantime?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to