gaoyunhaii commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r297001645
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##########
 @@ -1788,6 +1848,108 @@ private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGat
                return testingResourceManagerGateway;
        }
 
+       @Test
+       public void testPartitionTableCleanupOnDisconnect() throws Exception {
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+               final JobGraph jobGraph = createSingleVertexJobGraph();
+
+               final ResultPartitionID resultPartitionId = new 
ResultPartitionID();
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               final PartitionTable<ResourceID> partitionTable = new 
PartitionTable<>();
+               
partitionTable.startTrackingPartition(taskManagerLocation.getResourceID(), 
resultPartitionId);
+
+               final JobMaster jobMaster = new JobMasterBuilder()
+                       .withConfiguration(configuration)
+                       .withJobGraph(jobGraph)
+                       .withHighAvailabilityServices(haServices)
+                       .withJobManagerSharedServices(jobManagerSharedServices)
+                       .withHeartbeatServices(heartbeatServices)
+                       .withOnCompletionActions(new 
TestingOnCompletionActions())
+                       .withPartitionTable(partitionTable)
+                       .createJobMaster();
+
+               final CompletableFuture<JobID> disconnectTaskExecutorFuture = 
new CompletableFuture<>();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setDisconnectJobManagerConsumer((jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
+                       .createTestingTaskExecutorGateway();
+
+               try {
+                       jobMaster.start(jobMasterId).get();
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       // register a slow to establish a connection
 
 Review comment:
   Is it "register a _slot_ to ... ?"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to