pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r424678536



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -109,10 +109,17 @@ private static CheckpointBarrierHandler 
createCheckpointBarrierHandler(
                switch (config.getCheckpointMode()) {
                        case EXACTLY_ONCE:
                                if (config.isUnalignedCheckpointsEnabled()) {
-                                       return new CheckpointBarrierUnaligner(
-                                               
numberOfInputChannelsPerGate.toArray(),
-                                               channelStateWriter,
-                                               taskName,
+                                       return new 
AlternatingCheckpointBarrierHandler(
+                                               new CheckpointBarrierAligner(
+                                                       taskName,
+                                                       channelIndexToInputGate,
+                                                       
inputGateToChannelIndexOffset,
+                                                       toNotifyOnCheckpoint),
+                                               new CheckpointBarrierUnaligner(
+                                                       
numberOfInputChannelsPerGate.toArray(),
+                                                       channelStateWriter,
+                                                       taskName,
+                                                       toNotifyOnCheckpoint),

Review comment:
       There are two methods in `CheckpointedInputGate` which are implemented 
in not very object oriented design (I guess they should be re-implemented) that 
would stop working with this change (I hope some tests will fail because of 
that):
   ```
        public void spillInflightBuffers(
                        long checkpointId,
                        int channelIndex,
                        ChannelStateWriter channelStateWriter) throws 
IOException {
                if (((CheckpointBarrierUnaligner) 
barrierHandler).hasInflightData(checkpointId, channelIndex)) {
                        
inputGate.getChannel(channelIndex).spillInflightBuffers(checkpointId, 
channelStateWriter);
                }
        }
   
        public CompletableFuture<Void> getAllBarriersReceivedFuture(long 
checkpointId) {
                return ((CheckpointBarrierUnaligner) 
barrierHandler).getAllBarriersReceivedFuture(checkpointId);
        }
   ```
   I guess `hasInflightData()` and `getAllBarriersReceivedFuture()` should be 
pulled to `CheckpointBarrierHandler` interface (separate commit?)?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
                localChannel.resumeConsumption();
        }
 
+       @Test
+       public void testNoNotifyOnSavepoint() throws IOException {
+               TestBufferReceivedListener listener = new 
TestBufferReceivedListener();
+               LocalInputChannel channel = new LocalInputChannel(
+                       new SingleInputGateBuilder().build(),
+                       0,
+                       new ResultPartitionID(),
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new TestCounter(),
+                       new TestCounter());
+               CheckpointBarrier barrier = new CheckpointBarrier(123L, 123L, 
new CheckpointOptions(SAVEPOINT, 
CheckpointStorageLocationReference.getDefault()));
+               channel.notifyPriorityEvent(new 
BufferConsumer(toBuffer(barrier).getMemorySegment(), 
FreeingBufferRecycler.INSTANCE, getDataType(barrier)));
+               channel.checkError();
+               assertTrue(listener.notifiedOnBarriers.isEmpty());

Review comment:
       frankly, it's not that unexpected that it's empty, since `listener` was 
never passed anywhere? 😈 
   
   (you forgot to register it to `SingleInputGate`?)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
                localChannel.resumeConsumption();
        }
 
+       @Test
+       public void testNoNotifyOnSavepoint() throws IOException {
+               TestBufferReceivedListener listener = new 
TestBufferReceivedListener();
+               LocalInputChannel channel = new LocalInputChannel(
+                       new SingleInputGateBuilder().build(),
+                       0,
+                       new ResultPartitionID(),
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new TestCounter(),
+                       new TestCounter());

Review comment:
       nit:
   ```
   InputChannelBuilder
     .newBuilder()
     .buildLocalChannel(new SingleInputGateBuilder().build())
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to