[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882740#comment-15882740
 ] 
ASF GitHub Bot commented on FLINK-4364:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102945494
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
    @@ -105,6 +110,105 @@
        @Rule
        public TestName name = new TestName();
     
    +   @Test
    +   public void testHeartbeatTimeoutWithJobManager() throws Exception {
    +           final JobID jobId = new JobID();
    +           final Configuration configuration = new Configuration();
    +           final TaskManagerConfiguration tmConfig = 
TaskManagerConfiguration.fromConfiguration(configuration);
    +           final ResourceID tmResourceId = new ResourceID("tm");
    +           final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), 
mock(TimerService.class));
    +
    +           final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
    +           final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
    +           final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    +           final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
    +           final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
    +           haServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);
    +           
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +           final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
    +
    +           final long heartbeatTimeout = 1000L;
    +           final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager = 
new HeartbeatManagerImpl<>(
    +                           heartbeatTimeout,
    +                           tmResourceId,
    +                           rpc.getExecutor(),
    +                           Executors.newSingleThreadScheduledExecutor(),
    +                           log);
    +
    +           final String jobMasterAddress = "jm";
    +           final UUID jmLeaderId = UUID.randomUUID();
    +           final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
    +           final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
    +           final int blobPort = 42;
    +
    +           when(jobMasterGateway.registerTaskManager(
    +                           any(String.class),
    +                           eq(taskManagerLocation),
    +                           eq(jmLeaderId),
    +                           any(Time.class)
    +           
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
    +           
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
    +
    +           try {
    +                   final TaskExecutor taskManager = new TaskExecutor(
    +                                   tmConfig,
    +                                   taskManagerLocation,
    +                                   rpc,
    +                                   mock(MemoryManager.class),
    +                                   mock(IOManager.class),
    +                                   mock(NetworkEnvironment.class),
    +                                   haServices,
    +                                   mock(MetricRegistry.class),
    +                                   tmHeartbeatManager,
    +                                   mock(TaskManagerMetricGroup.class),
    +                                   mock(BroadcastVariableManager.class),
    +                                   mock(FileCache.class),
    +                                   taskSlotTable,
    +                                   new JobManagerTable(),
    +                                   jobLeaderService,
    +                                   testingFatalErrorHandler);
    +
    +                   taskManager.start();
    +
    +                   rpc.registerGateway(jobMasterAddress, jobMasterGateway);
    +
    +                   // we have to add the job after the TaskExecutor, 
because otherwise the service has not
    +                   // been properly started.
    +                   jobLeaderService.addJob(jobId, jobMasterAddress);
    +
    +                   // now inform the task manager about the new job leader
    +                   
jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
    +
    +                   // register task manager success will trigger 
monitoring heartbeat target between tm and jm
    +                   verify(jobMasterGateway).registerTaskManager(
    +                                   eq(taskManager.getAddress()), 
eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
    +
    +                   final ConcurrentHashMap<ResourceID, Object> 
heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, 
"heartbeatTargets");
    +                   final JobManagerTable jobManagerTable = 
Whitebox.getInternalState(taskManager, "jobManagerTable");
    +                   final Map<ResourceID, JobManagerConnection> 
jobManagerConnections = Whitebox.getInternalState(taskManager, 
"jobManagerConnections");
    +
    +                   // before heartbeat timeout
    +                   assertTrue(heartbeatTargets.containsKey(jmResourceId));
    +                   assertTrue(jobManagerTable.contains(jobId));
    +                   
assertTrue(jobManagerConnections.containsKey(jmResourceId));
    +
    +                   // the job manager will not schedule heartbeat because 
of mock and the task manager will be notified heartbeat timeout
    +                   Thread.sleep(heartbeatTimeout);
    +
    +                   // after heartbeat timeout
    +                   assertFalse(jobManagerTable.contains(jobId));
    +                   
assertFalse(jobManagerConnections.containsKey(jmResourceId));
    +                   
verify(jobMasterGateway).disconnectTaskManager(eq(tmResourceId));
    --- End diff --
    
    Better to introduce the timeout here. That way we wait a given time until 
the `disconnectTaskManager` should have happened.


> Implement TaskManager side of heartbeat from JobManager
> -------------------------------------------------------
>
>                 Key: FLINK-4364
>                 URL: https://issues.apache.org/jira/browse/FLINK-4364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhijiang
>            Assignee: zhijiang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to