Thesharing commented on a change in pull request #16856: URL: https://github.com/apache/flink/pull/16856#discussion_r695574774
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java ########## @@ -136,7 +138,24 @@ public void toggleVertexFinishedUnfinished() { regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2); final List<IntermediateResultPartitionID> partitionsToRelease = - regionPartitionReleaseStrategy.vertexFinished(consumerVertex1); + getReleasablePartitions(regionPartitionReleaseStrategy, consumerVertex1); assertThat(partitionsToRelease, is(empty())); } + + private static List<IntermediateResultPartitionID> getReleasablePartitions( + final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy, + final ExecutionVertexID finishedVertex) { + + final List<IntermediateResultPartitionID> releasablePartitions = new ArrayList<>(); Review comment: Thank you for providing a better implementation. I just ignored the `IterableUtils` can be used here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java ########## @@ -158,25 +159,122 @@ private void testCacheRemovedCorrectlyAfterFailover( assertEquals(PARALLELISM, shuffleDescriptors.length); assertEquals(expectedBefore, blobWriter.numberOfBlobs()); - triggerExceptionAndComplete(scheduler, v1); + triggerGlobalFailoverAndComplete(scheduler, v1); ioExecutor.triggerAll(); // Cache should be removed during ExecutionVertex#resetForNewExecution assertNull(getConsumedCachedShuffleDescriptor(executionGraph, v2)); assertEquals(expectedAfter, blobWriter.numberOfBlobs()); } - private static DefaultScheduler createSchedulerAndDeploy( + @Test + public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception { + testRemoveCacheForPointwiseEdgeAfterFinished( + new TestingBlobWriter(Integer.MAX_VALUE), 0, 0); + } + + @Test + public void testRemoveOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception { + testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 7, 6); Review comment: Agreed. I've added a comment here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java ########## @@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution attempt) { final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); if (attempt.getState() == ExecutionState.FINISHED) { - final List<IntermediateResultPartitionID> releasablePartitions = + final List<ConsumedPartitionGroup> releasablePartitions = Review comment: Resolved. Furthermore, should we rename the method `releasablePartitions` to `releasablePartitionGroups`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java ########## @@ -36,9 +36,9 @@ * Calling this method informs the strategy that a vertex finished. * * @param finishedVertex Id of the vertex that finished the execution - * @return A list of result partitions that can be released + * @return A list of {@link ConsumedPartitionGroup}s that can be released Review comment: Thank you for pointing this out. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java ########## @@ -1266,30 +1267,34 @@ private void maybeReleasePartitions(final Execution attempt) { final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); if (attempt.getState() == ExecutionState.FINISHED) { - final List<IntermediateResultPartitionID> releasablePartitions = + final List<ConsumedPartitionGroup> releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex); releasePartitions(releasablePartitions); } else { partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex); } } - private void releasePartitions(final List<IntermediateResultPartitionID> releasablePartitions) { - if (releasablePartitions.size() > 0) { + private void releasePartitions(final List<ConsumedPartitionGroup> releasablePartitionGroups) { + if (releasablePartitionGroups.size() > 0) { // Remove cached ShuffleDescriptor when partition is released - releasablePartitions.stream() + releasablePartitionGroups.stream() + .map(ConsumedPartitionGroup::getFirst) .map(IntermediateResultPartitionID::getIntermediateDataSetID) .distinct() .map(intermediateResults::get) .forEach(IntermediateResult::notifyPartitionChanged); - final List<ResultPartitionID> partitionIds = - releasablePartitions.stream() - .map(this::createResultPartitionId) - .collect(Collectors.toList()); + final List<ResultPartitionID> releasablePartitionIds = new ArrayList<>(); Review comment: Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org