[ 
https://issues.apache.org/jira/browse/FLINK-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15385799#comment-15385799
 ] 

ASF GitHub Bot commented on FLINK-4152:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2257#discussion_r71517956
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -405,36 +405,23 @@ class JobManager(
     
           currentResourceManager match {
             case Some(rm) =>
    -          val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
    -          future.onComplete {
    -            case scala.util.Success(response) =>
    -              // the resource manager is available and answered
    -              self ! response
    -            case scala.util.Failure(t) =>
    +          val future = (rm ? decorateMessage(new 
RegisterResource(msg)))(timeout)
    +          future.onFailure {
    +            case t: Throwable =>
                   t match {
                     case _: TimeoutException =>
                       log.info("Attempt to register resource at 
ResourceManager timed out. Retrying")
                     case _ =>
                       log.warn("Failure while asking ResourceManager for 
RegisterResource. Retrying", t)
                   }
    -              // slow or unreachable resource manager, register anyway and 
let the rm reconnect
    -              self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
                   self ! decorateMessage(new ReconnectResourceManager(rm))
               }(context.dispatcher)
     
             case None =>
               log.info("Task Manager Registration but not connected to 
ResourceManager")
    -          // ResourceManager not yet available
    -          // sending task manager information later upon ResourceManager 
registration
    -          self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
           }
     
    -    case msg: RegisterResourceSuccessful =>
    -
    -      val originalMsg = msg.getRegistrationMessage
    -      val taskManager = msg.getTaskManager
    -
    -      // ResourceManager knows about the resource, now let's try to 
register TaskManager
    +      // ResourceManager is told about the resource, now let's try to 
register TaskManager
           if (instanceManager.isRegistered(taskManager)) {
    --- End diff --
    
    Would it make sense to attempt registration of the task manager at the 
JobManager first and then, if successful, notify the ResourceManager? That's 
what the comment for `registeredWorkers` states now and I think it would make 
sense.


> TaskManager registration exponential backoff doesn't work
> ---------------------------------------------------------
>
>                 Key: FLINK-4152
>                 URL: https://issues.apache.org/jira/browse/FLINK-4152
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, TaskManager, YARN Client
>            Reporter: Robert Metzger
>            Assignee: Till Rohrmann
>         Attachments: logs.tgz
>
>
> While testing Flink 1.1 I've found that the TaskManagers are logging many 
> messages when registering at the JobManager.
> This is the log file: 
> https://gist.github.com/rmetzger/0cebe0419cdef4507b1e8a42e33ef294
> Its logging more than 3000 messages in less than a minute. I don't think that 
> this is the expected behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to