GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r256289829
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ########## @@ -1430,6 +1430,120 @@ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception } } + @Nonnull + private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + return testingResourceManagerGateway; + } + + /** + * Tests that the job execution is failed if the TaskExecutor disconnects from the + * JobMaster. + */ + @Test + public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { + runJobFailureWhenTaskExecutorTerminatesTest( + () -> heartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> jobMasterGateway.disconnectTaskManager( + localTaskManagerLocation.getResourceID(), + new FlinkException("Test disconnectTaskManager exception.")), + (jobMasterGateway, resourceID) -> ignored -> {}); + } + + @Test + public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { + final AtomicBoolean respondToHeartbeats = new AtomicBoolean(true); + runJobFailureWhenTaskExecutorTerminatesTest( + () -> fastHeartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> respondToHeartbeats.set(false), + (jobMasterGateway, taskManagerResourceId) -> resourceId -> { + if (respondToHeartbeats.get()) { + jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new AccumulatorReport(Collections.emptyList())); + } + } + ); + } + + private void runJobFailureWhenTaskExecutorTerminatesTest( + Supplier<HeartbeatServices> heartbeatSupplier, + BiConsumer<LocalTaskManagerLocation, JobMasterGateway> jobReachedRunningState, + BiFunction<JobMasterGateway, ResourceID, Consumer<ResourceID>> heartbeatConsumerFunction) throws Exception { + final JobGraph jobGraph = createSingleVertexJobGraph(); + final TestingOnCompletionActions onCompletionActions = new TestingOnCompletionActions(); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatSupplier.get(), + onCompletionActions); + + createAndRegisterTestingResourceManagerGateway(); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway, taskManagerLocation.getResourceID())) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); + + jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN); + final Collection<SlotOffer> slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get(); + + assertThat(slotOffers, hasSize(1)); + + final ExecutionAttemptID executionAttemptId = taskDeploymentFuture.get(); + + jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), executionAttemptId, ExecutionState.RUNNING)).get(); + + jobReachedRunningState.accept(taskManagerLocation, jobMasterGateway); + + final ArchivedExecutionGraph archivedExecutionGraph = onCompletionActions.getJobReachedGloballyTerminalStateFuture().get(); + + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); Review comment: If you deem the method readable, leave it as is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services