Thesharing commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r695574774



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
##########
@@ -136,7 +138,24 @@ public void toggleVertexFinishedUnfinished() {
         regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
 
         final List<IntermediateResultPartitionID> partitionsToRelease =
-                regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+                getReleasablePartitions(regionPartitionReleaseStrategy, 
consumerVertex1);
         assertThat(partitionsToRelease, is(empty()));
     }
+
+    private static List<IntermediateResultPartitionID> getReleasablePartitions(
+            final RegionPartitionReleaseStrategy 
regionPartitionReleaseStrategy,
+            final ExecutionVertexID finishedVertex) {
+
+        final List<IntermediateResultPartitionID> releasablePartitions = new 
ArrayList<>();

Review comment:
       Thank you for providing a better implementation. I just ignored the 
`IterableUtils` can be used here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
##########
@@ -158,25 +159,122 @@ private void testCacheRemovedCorrectlyAfterFailover(
         assertEquals(PARALLELISM, shuffleDescriptors.length);
         assertEquals(expectedBefore, blobWriter.numberOfBlobs());
 
-        triggerExceptionAndComplete(scheduler, v1);
+        triggerGlobalFailoverAndComplete(scheduler, v1);
         ioExecutor.triggerAll();
 
         // Cache should be removed during ExecutionVertex#resetForNewExecution
         assertNull(getConsumedCachedShuffleDescriptor(executionGraph, v2));
         assertEquals(expectedAfter, blobWriter.numberOfBlobs());
     }
 
-    private static DefaultScheduler createSchedulerAndDeploy(
+    @Test
+    public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() 
throws Exception {
+        testRemoveCacheForPointwiseEdgeAfterFinished(
+                new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
+    }
+
+    @Test
+    public void testRemoveOffloadedCacheForPointwiseEdgeAfterFinished() throws 
Exception {
+        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 
7, 6);

Review comment:
       Agreed. I've added a comment here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitions =

Review comment:
       Resolved. Furthermore, should we rename the method 
`releasablePartitions` to `releasablePartitionGroups`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
##########
@@ -36,9 +36,9 @@
      * Calling this method informs the strategy that a vertex finished.
      *
      * @param finishedVertex Id of the vertex that finished the execution
-     * @return A list of result partitions that can be released
+     * @return A list of {@link ConsumedPartitionGroup}s that can be released

Review comment:
       Thank you for pointing this out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution 
attempt) {
         final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
 
         if (attempt.getState() == ExecutionState.FINISHED) {
-            final List<IntermediateResultPartitionID> releasablePartitions =
+            final List<ConsumedPartitionGroup> releasablePartitions =
                     
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
             releasePartitions(releasablePartitions);
         } else {
             partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
         }
     }
 
-    private void releasePartitions(final List<IntermediateResultPartitionID> 
releasablePartitions) {
-        if (releasablePartitions.size() > 0) {
+    private void releasePartitions(final List<ConsumedPartitionGroup> 
releasablePartitionGroups) {
+        if (releasablePartitionGroups.size() > 0) {
 
             // Remove cached ShuffleDescriptor when partition is released
-            releasablePartitions.stream()
+            releasablePartitionGroups.stream()
+                    .map(ConsumedPartitionGroup::getFirst)
                     
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
                     .distinct()
                     .map(intermediateResults::get)
                     .forEach(IntermediateResult::notifyPartitionChanged);
 
-            final List<ResultPartitionID> partitionIds =
-                    releasablePartitions.stream()
-                            .map(this::createResultPartitionId)
-                            .collect(Collectors.toList());
+            final List<ResultPartitionID> releasablePartitionIds = new 
ArrayList<>();

Review comment:
       Resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to