reswqa commented on code in PR #23565:
URL: https://github.com/apache/flink/pull/23565#discussion_r1377202984


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -299,6 +299,24 @@ else if (currentBackoff < maxBackoff) {
         return false;
     }
 
+    /**
+     * The remote task manager creates partition request listener and returns 
{@link
+     * PartitionNotFoundException} until the listener is timeout, so the 
backoff should add the
+     * timeout milliseconds if it exists.
+     *
+     * @param timeoutMS The timeout milliseconds that the partition request 
listener timeout
+     * @return <code>true</code>, iff the operation was successful. Otherwise, 
<code>false</code>.
+     */
+    protected boolean increaseBackoff(int timeoutMS) {

Review Comment:
   What's the purpose of this method here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java:
##########
@@ -40,9 +51,35 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
     private final Map<ResultPartitionID, ResultPartition> registeredPartitions 
=
             CollectionUtil.newHashMapWithExpectedSize(16);
 
+    private final Map<ResultPartitionID, PartitionRequestListenerManager> 
listenerManagers =

Review Comment:
   ```suggestion
       @GuardedBy("registeredPartitions")
       private final Map<ResultPartitionID, PartitionRequestListenerManager> 
listenerManagers =
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -78,27 +84,47 @@ class CreditBasedSequenceNumberingViewReader
     }
 
     @Override
-    public void requestSubpartitionView(
+    public void requestSubpartitionViewOrRegisterListener(
             ResultPartitionProvider partitionProvider,
             ResultPartitionID resultPartitionId,
             int subPartitionIndex)
             throws IOException {
-
         synchronized (requestLock) {
-            if (subpartitionView == null) {
-                // This call can trigger a notification we have to
-                // schedule a separate task at the event loop that will
-                // start consuming this. Otherwise the reference to the
-                // view cannot be available in getNextBuffer().
-                this.subpartitionView =
-                        partitionProvider.createSubpartitionView(
-                                resultPartitionId, subPartitionIndex, this);
+            checkState(subpartitionView == null, "Subpartition already 
requested");
+            checkState(
+                    partitionRequestListener == null, "Partition request 
notifier already created");

Review Comment:
   ```suggestion
                       partitionRequestListener == null, "Partition request 
listener already created");
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java:
##########
@@ -55,13 +62,95 @@ public void testCreateViewForRegisteredPartition() throws 
Exception {
                 partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
     }
 
+    /**
+     * {@link ResultPartitionManager} creates subpartition view reader after 
the given partition is
+     * registered.
+     */
+    @Test
+    void testCreateSubpartitionViewAfterRegisteredPartition() throws Exception 
{
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        assertThat(partitionManager.getListenerManagers().isEmpty()).isTrue();
+
+        partitionManager.registerResultPartition(partition);
+        PartitionRequestListener partitionRequestListener =
+                TestingPartitionRequestListener.newBuilder().build();
+        assertThat(
+                        partitionManager
+                                .createSubpartitionViewOrRegisterListener(
+                                        partition.getPartitionId(),
+                                        0,
+                                        new NoOpBufferAvailablityListener(),
+                                        partitionRequestListener)
+                                .isPresent())
+                .isTrue();
+        assertThat(partitionManager.getListenerManagers().isEmpty()).isTrue();
+    }
+
+    /**
+     * The {@link ResultPartitionManager} registers {@link 
PartitionRequestListener} before specify
+     * {@link ResultPartition} is registered. When the {@link ResultPartition} 
is registered, the
+     * {@link ResultPartitionManager} will find the listener and create 
partition view reader. an
+     */
+    @Test
+    void testRegisterPartitionListenerBeforeRegisteredPartition() throws 
Exception {
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        assertThat(partitionManager.getListenerManagers().isEmpty()).isTrue();
+
+        final CompletableFuture<ResultPartition> 
notifySubpartitionCreatedFuture =
+                new CompletableFuture<>();
+        PartitionRequestListener partitionRequestListener =
+                TestingPartitionRequestListener.newBuilder()
+                        .setResultPartitionId(partition.getPartitionId())
+                        .setNetworkSequenceViewReader(
+                                
TestingSubpartitionCreatedViewReader.newBuilder()
+                                        .setNotifySubpartitionCreatedConsumer(
+                                                tuple ->
+                                                        
notifySubpartitionCreatedFuture.complete(
+                                                                tuple.f0))
+                                        .build())
+                        .build();
+        assertThat(

Review Comment:
   IIRC, `AsserJ` has assertion for `Optional` like 
`assertThat(xxx).isNotPresent()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to