reswqa commented on code in PR #23565: URL: https://github.com/apache/flink/pull/23565#discussion_r1377202984
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java: ########## @@ -299,6 +299,24 @@ else if (currentBackoff < maxBackoff) { return false; } + /** + * The remote task manager creates partition request listener and returns {@link + * PartitionNotFoundException} until the listener is timeout, so the backoff should add the + * timeout milliseconds if it exists. + * + * @param timeoutMS The timeout milliseconds that the partition request listener timeout + * @return <code>true</code>, iff the operation was successful. Otherwise, <code>false</code>. + */ + protected boolean increaseBackoff(int timeoutMS) { Review Comment: What's the purpose of this method here? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java: ########## @@ -40,9 +51,35 @@ public class ResultPartitionManager implements ResultPartitionProvider { private final Map<ResultPartitionID, ResultPartition> registeredPartitions = CollectionUtil.newHashMapWithExpectedSize(16); + private final Map<ResultPartitionID, PartitionRequestListenerManager> listenerManagers = Review Comment: ```suggestion @GuardedBy("registeredPartitions") private final Map<ResultPartitionID, PartitionRequestListenerManager> listenerManagers = ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ########## @@ -78,27 +84,47 @@ class CreditBasedSequenceNumberingViewReader } @Override - public void requestSubpartitionView( + public void requestSubpartitionViewOrRegisterListener( ResultPartitionProvider partitionProvider, ResultPartitionID resultPartitionId, int subPartitionIndex) throws IOException { - synchronized (requestLock) { - if (subpartitionView == null) { - // This call can trigger a notification we have to - // schedule a separate task at the event loop that will - // start consuming this. Otherwise the reference to the - // view cannot be available in getNextBuffer(). - this.subpartitionView = - partitionProvider.createSubpartitionView( - resultPartitionId, subPartitionIndex, this); + checkState(subpartitionView == null, "Subpartition already requested"); + checkState( + partitionRequestListener == null, "Partition request notifier already created"); Review Comment: ```suggestion partitionRequestListener == null, "Partition request listener already created"); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java: ########## @@ -55,13 +62,95 @@ public void testCreateViewForRegisteredPartition() throws Exception { partition.getPartitionId(), 0, new NoOpBufferAvailablityListener()); } + /** + * {@link ResultPartitionManager} creates subpartition view reader after the given partition is + * registered. + */ + @Test + void testCreateSubpartitionViewAfterRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + assertThat(partitionManager.getListenerManagers().isEmpty()).isTrue(); + + partitionManager.registerResultPartition(partition); + PartitionRequestListener partitionRequestListener = + TestingPartitionRequestListener.newBuilder().build(); + assertThat( + partitionManager + .createSubpartitionViewOrRegisterListener( + partition.getPartitionId(), + 0, + new NoOpBufferAvailablityListener(), + partitionRequestListener) + .isPresent()) + .isTrue(); + assertThat(partitionManager.getListenerManagers().isEmpty()).isTrue(); + } + + /** + * The {@link ResultPartitionManager} registers {@link PartitionRequestListener} before specify + * {@link ResultPartition} is registered. When the {@link ResultPartition} is registered, the + * {@link ResultPartitionManager} will find the listener and create partition view reader. an + */ + @Test + void testRegisterPartitionListenerBeforeRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + assertThat(partitionManager.getListenerManagers().isEmpty()).isTrue(); + + final CompletableFuture<ResultPartition> notifySubpartitionCreatedFuture = + new CompletableFuture<>(); + PartitionRequestListener partitionRequestListener = + TestingPartitionRequestListener.newBuilder() + .setResultPartitionId(partition.getPartitionId()) + .setNetworkSequenceViewReader( + TestingSubpartitionCreatedViewReader.newBuilder() + .setNotifySubpartitionCreatedConsumer( + tuple -> + notifySubpartitionCreatedFuture.complete( + tuple.f0)) + .build()) + .build(); + assertThat( Review Comment: IIRC, `AsserJ` has assertion for `Optional` like `assertThat(xxx).isNotPresent()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org