AHeise commented on a change in pull request #13539:
URL: https://github.com/apache/flink/pull/13539#discussion_r499424128



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -247,7 +248,18 @@ public void setup() throws IOException {
        }
 
        @Override
-       public CompletableFuture<?> readRecoveredState(ExecutorService 
executor, ChannelStateReader reader) {
+       public CompletableFuture<?> readRecoveredState(ExecutorService 
executor, ChannelStateReader reader) throws IOException {
+               synchronized (requestLock) {
+                       if (closeFuture.isDone()) {
+                               return FutureUtils.completedVoidFuture();
+                       }
+                       for (InputChannel inputChannel : 
inputChannels.values()) {
+                               if (inputChannel instanceof 
RemoteRecoveredInputChannel) {
+                                       ((RemoteRecoveredInputChannel) 
inputChannel).assignExclusiveSegments();
+                               }
+                       }
+               }
+

Review comment:
       I'll add. In short, the #number of required buffers is now higher than a 
few tests (and possibly production setups) assume. Without the lazy 
initialization, you cannot simulate backpressure in a few scenarios as easily.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose() 
throws Exception {
                                }
                        };
 
-                       submitTasksAndWaitForResults(executor, new Callable[] 
{closeTask, readRecoveredStateTask, processStateTask});
-               } finally {
-                       executor.shutdown();
+                       executor.invokeAll(Arrays.asList(closeTask, 
readRecoveredStateTask, processStateTask));
+
                        // wait until the internal channel state recover task 
finishes
-                       executor.awaitTermination(60, TimeUnit.SECONDS);
                        assertEquals(totalBuffers, 
environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
                        assertTrue(inputGate.getCloseFuture().isDone());
-
-                       environment.close();

Review comment:
       `close` is called by the `Closer`.
   
   `shutdown` + `awaitTermination` is simply the wrong method. `invokeAll` is 
doing what was intended. Could be an extra commit. However, it should then 
probably be done on all 10 places that use `submitTasksAndWaitForResults`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java
##########
@@ -42,8 +42,17 @@
                        TaskEventPublisher taskEventPublisher,
                        int initialBackOff,
                        int maxBackoff,
+                       int networkBuffersPerChannel,
                        InputChannelMetrics metrics) {
-               super(inputGate, channelIndex, partitionId, initialBackOff, 
maxBackoff, metrics.getNumBytesInLocalCounter(), 
metrics.getNumBuffersInLocalCounter());
+               super(

Review comment:
       Hm you are right, it doesn't solve it completely after having read the 
ticket. However, without a solution for FLINK-13203, there will also not be a 
real solution here.
   On the other hand, it's inherently wrong to treat local and remote channels 
differently during recovery (they even share the same implementation). So this 
commit is still fixing the issue in a best effort manner and certainly helps to 
improve build stability, which is an improvement of its own.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -59,26 +59,27 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;

Review comment:
       I didn't even know that double-tags are a thing. :p




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to