[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882740#comment-15882740 ]
ASF GitHub Bot commented on FLINK-4364: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r102945494 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -105,6 +110,105 @@ @Rule public TestName name = new TestName(); + @Test + public void testHeartbeatTimeoutWithJobManager() throws Exception { + final JobID jobId = new JobID(); + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration); + final ResourceID tmResourceId = new ResourceID("tm"); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), mock(TimerService.class)); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(); + haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService); + haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); + + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final long heartbeatTimeout = 1000L; + final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + tmResourceId, + rpc.getExecutor(), + Executors.newSingleThreadScheduledExecutor(), + log); + + final String jobMasterAddress = "jm"; + final UUID jmLeaderId = UUID.randomUUID(); + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + final int blobPort = 42; + + when(jobMasterGateway.registerTaskManager( + any(String.class), + eq(taskManagerLocation), + eq(jmLeaderId), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress); + + try { + final TaskExecutor taskManager = new TaskExecutor( + tmConfig, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + tmHeartbeatManager, + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + new JobManagerTable(), + jobLeaderService, + testingFatalErrorHandler); + + taskManager.start(); + + rpc.registerGateway(jobMasterAddress, jobMasterGateway); + + // we have to add the job after the TaskExecutor, because otherwise the service has not + // been properly started. + jobLeaderService.addJob(jobId, jobMasterAddress); + + // now inform the task manager about the new job leader + jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId); + + // register task manager success will trigger monitoring heartbeat target between tm and jm + verify(jobMasterGateway).registerTaskManager( + eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class)); + + final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets"); + final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable"); + final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections"); + + // before heartbeat timeout + assertTrue(heartbeatTargets.containsKey(jmResourceId)); + assertTrue(jobManagerTable.contains(jobId)); + assertTrue(jobManagerConnections.containsKey(jmResourceId)); + + // the job manager will not schedule heartbeat because of mock and the task manager will be notified heartbeat timeout + Thread.sleep(heartbeatTimeout); + + // after heartbeat timeout + assertFalse(jobManagerTable.contains(jobId)); + assertFalse(jobManagerConnections.containsKey(jmResourceId)); + verify(jobMasterGateway).disconnectTaskManager(eq(tmResourceId)); --- End diff -- Better to introduce the timeout here. That way we wait a given time until the `disconnectTaskManager` should have happened. > Implement TaskManager side of heartbeat from JobManager > ------------------------------------------------------- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: zhijiang > Assignee: zhijiang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)