[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291708#comment-14291708 ]
ASF GitHub Bot commented on FLINK-1352: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23523674 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { - registrationAttempts = 0 - import context.dispatcher - registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) + registrationDuration = 0 seconds + + registered = false + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager => { - registrationAttempts += 1 + if(!registered) { + registrationDuration += registrationDelay + // double delay for exponential backoff + registrationDelay *= 2 - if (registered) { - registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { + if (registrationDuration > maxRegistrationDuration) { + log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL, - log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL, - registrationAttempts) - val jobManager = context.actorSelection(jobManagerAkkaURL) + maxRegistrationDuration) - jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { - log.error("TaskManager could not register at JobManager."); - self ! PoisonPill + self ! PoisonPill + } else if (!registered) { + log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + + s"Attempt") + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) + } } } case AcknowledgeRegistration(id, blobPort) => { - if (!registered) { + if(!registered) { + finishRegistration(id, blobPort) registered = true - currentJobManager = sender - instanceID = id - - context.watch(currentJobManager) - - log.info("TaskManager successfully registered at JobManager {}.", - currentJobManager.path.toString) - - setupNetworkEnvironment() - setupLibraryCacheManager(blobPort) + } else { + if (log.isDebugEnabled) { + log.debug("The TaskManager {} is already registered at the JobManager {}, but received " + + "another AcknowledgeRegistration message.", self.path, currentJobManager.path) + } + } + } - heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) + case AlreadyRegistered(id, blobPort) => + if(!registered) { + log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" + + "though it has not yet finished the registration process.", self.path, sender.path) - profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) + finishRegistration(id, blobPort) + registered = true + } else { + // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration + if(log.isDebugEnabled){ + log.debug("The TaskManager {} has already been registered at the JobManager {}.", + self.path, sender.path) } + } - for (listener <- waitForRegistration) { - listener ! RegisteredAtJobManager - } + case RefuseRegistration(reason) => + if(!registered) { + log.error("The registration of task manager {} was refused by the job manager {} " + + "because {}.", self.path, jobManagerAkkaURL, reason) - waitForRegistration.clear() + // Shut task manager down + self ! PoisonPill + } else { + // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration + if(log.isDebugEnabled) { --- End diff -- I was wondering whether this gives us any valuable information for bug-tracking purposes. You're right that it should not happen too often and thus it won't probably hurt too much. > Buggy registration from TaskManager to JobManager > ------------------------------------------------- > > Key: FLINK-1352 > URL: https://issues.apache.org/jira/browse/FLINK-1352 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Till Rohrmann > Fix For: 0.9 > > > The JobManager's InstanceManager may refuse the registration attempt from a > TaskManager, because it has this taskmanager already connected, or,in the > future, because the TaskManager has been blacklisted as unreliable. > Unpon refused registration, the instance ID is null, to signal that refused > registration. TaskManager reacts incorrectly to such methods, assuming > successful registration > Possible solution: JobManager sends back a dedicated "RegistrationRefused" > message, if the instance manager returns null as the registration result. If > the TastManager receives that before being registered, it knows that the > registration response was lost (which should not happen on TCP and it would > indicate a corrupt connection) > Followup question: Does it make sense to have the TaskManager trying > indefinitely to connect to the JobManager. With increasing interval (from > seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)