GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r256290442
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ########## @@ -1430,6 +1422,125 @@ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception } } + @Nonnull + private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + return testingResourceManagerGateway; + } + + /** + * Tests that the job execution is failed if the TaskExecutor disconnects from the + * JobMaster. + */ + @Test + public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { + runJobFailureWhenTaskExecutorTerminatesTest( + () -> heartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> jobMasterGateway.disconnectTaskManager( + localTaskManagerLocation.getResourceID(), + new FlinkException("Test disconnectTaskManager exception.")), + (jobMasterGateway, resourceID) -> ignored -> {}); + } + + @Test + public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { + final AtomicBoolean respondToHeartbeats = new AtomicBoolean(true); + runJobFailureWhenTaskExecutorTerminatesTest( + () -> fastHeartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> respondToHeartbeats.set(false), + (jobMasterGateway, taskManagerResourceId) -> resourceId -> { + if (respondToHeartbeats.get()) { + jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new AccumulatorReport(Collections.emptyList())); + } + } + ); + } + + private void runJobFailureWhenTaskExecutorTerminatesTest( + Supplier<HeartbeatServices> heartbeatSupplier, + BiConsumer<LocalTaskManagerLocation, JobMasterGateway> jobReachedRunningState, + BiFunction<JobMasterGateway, ResourceID, Consumer<ResourceID>> heartbeatConsumerFunction) throws Exception { + final JobGraph jobGraph = createSingleVertexJobGraph(); + final TestingOnCompletionActions onCompletionActions = new TestingOnCompletionActions(); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatSupplier.get(), + onCompletionActions); + + final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); + notifyResourceManagerLeaderListeners(testingResourceManagerGateway); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway, taskManagerLocation.getResourceID())) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); + + jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + + offerSingleSlot(jobMasterGateway, taskManagerLocation); + + final ExecutionAttemptID executionAttemptId = taskDeploymentFuture.get(); + + jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), executionAttemptId, ExecutionState.RUNNING)).get(); + + jobReachedRunningState.accept(taskManagerLocation, jobMasterGateway); + + final ArchivedExecutionGraph archivedExecutionGraph = onCompletionActions.getJobReachedGloballyTerminalStateFuture().get(); + + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + private void offerSingleSlot(JobMasterGateway jobMasterGateway, LocalTaskManagerLocation taskManagerLocation) throws InterruptedException, ExecutionException { + final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN); + final Collection<SlotOffer> slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get(); + + assertThat(slotOffers, hasSize(1)); + } + + private static final class TestingOnCompletionActions implements OnCompletionActions { + + private final CompletableFuture<ArchivedExecutionGraph> jobReachedGloballyTerminalStateFuture = new CompletableFuture<>(); + private final CompletableFuture<Void> jobFinishedByOtherFuture = new CompletableFuture<>(); Review comment: This unused field is for future extensions? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services