pnowojski commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r487965745



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -147,6 +148,14 @@ else if (configuredInput instanceof SourceInputConfig) {
                return anyInputAvailable;
        }
 
+       @Override
+       public CompletableFuture<?> getPriorityEventAvailableFuture() {
+               return CompletableFuture.anyOf(
+                       Arrays.stream(inputProcessors)
+                               .map(inputProcessor -> 
inputProcessor.taskInput.getPriorityEventAvailableFuture())
+                               .toArray(CompletableFuture[]::new));

Review comment:
       This is an intermediate code that is being replaced later. Can you 
squash those changes?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -371,12 +371,16 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
         */
        @VisibleForTesting
        CompletableFuture<?> getInputOutputJointFuture(InputStatus status) {
+               final CompletableFuture<?> priorityEventAvailableFuture = 
inputProcessor.getPriorityEventAvailableFuture();
                if (status == InputStatus.NOTHING_AVAILABLE && 
!recordWriter.isAvailable()) {
-                       return 
CompletableFuture.allOf(inputProcessor.getAvailableFuture(), 
recordWriter.getAvailableFuture());
+                       return CompletableFuture.anyOf(
+                               priorityEventAvailableFuture,
+                               
CompletableFuture.allOf(inputProcessor.getAvailableFuture(), 
recordWriter.getAvailableFuture()));
                } else if (status == InputStatus.NOTHING_AVAILABLE) {
-                       return inputProcessor.getAvailableFuture();
+                       return 
CompletableFuture.anyOf(inputProcessor.getAvailableFuture(),
+                               priorityEventAvailableFuture);
                } else {
-                       return recordWriter.getAvailableFuture();
+                       return 
CompletableFuture.anyOf(priorityEventAvailableFuture, 
recordWriter.getAvailableFuture());

Review comment:
       As we discussed online there is a bit of duplicated code here.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -232,19 +232,16 @@ public int getInputIndex() {
                        ChannelStateWriter channelStateWriter,
                        long checkpointId) throws IOException {
                for (int channelIndex = 0; channelIndex < 
recordDeserializers.length; channelIndex++) {
-                       final InputChannel channel = 
checkpointedInputGate.getChannel(channelIndex);
-
-                       // Assumption for retrieving buffers = one concurrent 
checkpoint
                        RecordDeserializer<?> deserializer = 
recordDeserializers[channelIndex];
                        if (deserializer != null) {
+                               final InputChannel channel = 
checkpointedInputGate.getChannel(channelIndex);
+
                                channelStateWriter.addInputData(
                                        checkpointId,
                                        channel.getChannelInfo(),
                                        
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                                        deserializer.getUnconsumedBuffer());
                        }
-
-                       
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, 
channelStateWriter);

Review comment:
       Are you sure it's going in the right direction this change? Previously 
spilling was explicit on demand, now it's happening magically (implicitly) 
inside `LocalInputChannel`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception {
                }
 
                InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+               updatePriorityAvailability();
                checkFinished(inputStatus, readingInputIndex);
                return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
        }
 
+       private void updatePriorityAvailability() {
+               if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) {
+                       final CompletableFuture<?> priorityEventAvailableFuture 
=
+                               
inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture();
+                       // no more priority events for the input
+                       if (!priorityEventAvailableFuture.isDone()) {
+                               
prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex);
+                               if 
(!prioritySelectionHandler.isAnyInputAvailable()) {
+                                       priorityAvailability.resetUnavailable();
+                               }
+                               
priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex));
+                       }
+               }
+       }
+
+       private Runnable onPriorityEvent(int index) {
+               return () -> {
+                       // set the priority flag in a mail before notifying 
StreamTask of availability
+                       mainMailboxExecutor.execute(() -> {
+                               
prioritySelectionHandler.setAvailableInput(index);
+                               
priorityAvailability.getUnavailableToResetAvailable().complete(null);
+                       }, "priority event {}", index);

Review comment:
       nit: add `StreamMultipleInputProcessor` to the mail's name?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -113,7 +113,11 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
 
                        if (++numBarriersReceived == numOpenChannels) {
                                allBarriersReceivedFuture.complete(null);
-                               resetPendingCheckpoint(barrierId);
+                               for (final InputGate gate : inputGates) {
+                                       for (int index = 0, numChannels = 
gate.getNumberOfInputChannels(); index < numChannels; index++) {
+                                               
gate.getChannel(index).checkpointStopped(currentCheckpointId);
+                                       }
+                               }

Review comment:
       What's the story behind this change? 
   1. it seems it differs only by a single line `numBarriersReceived = 0;`, so 
at the very least we should deduplicate some code here
   2. can you explain what's the functional change?
   3. aren't we missing a unit test for that? It would help answer point 2., 
and if there was a bug discovered in e2e test, it would be nice to have a 
faster unit test for that as well.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
                        buffers.add(bufferConsumer);
                        return false;
                }
-               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
-                       "checkpoints");
 
                final int pos = buffers.getNumPriorityElements();
                buffers.addPriorityElement(bufferConsumer);
 
-               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
-               if (unalignedCheckpoint) {
+               CheckpointBarrier barrier = 
parseCheckpointBarrier(bufferConsumer);
+               if (barrier != null) {
+                       checkState(
+                               
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+                               "Only unaligned checkpoints should be priority 
events");
                        final Iterator<BufferConsumer> iterator = 
buffers.iterator();
                        Iterators.advance(iterator, pos + 1);
+                       List<Buffer> inflightBuffers = new ArrayList<>();
                        while (iterator.hasNext()) {
                                BufferConsumer buffer = iterator.next();
 
                                if (buffer.isBuffer()) {
                                        try (BufferConsumer bc = buffer.copy()) 
{
-                                               
inflightBufferSnapshot.add(bc.build());
+                                               inflightBuffers.add(bc.build());
                                        }
                                }
                        }
+                       channelStateWriter.addOutputData(
+                               barrier.getId(),
+                               subpartitionInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               inflightBuffers.toArray(new Buffer[0]));
                }
                return pos == 0;
        }
 
-       private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
-               boolean unalignedCheckpoint;
+       @Nullable
+       private CheckpointBarrier parseCheckpointBarrier(BufferConsumer 
bufferConsumer) {

Review comment:
       This is again modifying code that I have already reviewed in the 
previous commit :( 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
                        buffers.add(bufferConsumer);
                        return false;
                }
-               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
-                       "checkpoints");
 
                final int pos = buffers.getNumPriorityElements();
                buffers.addPriorityElement(bufferConsumer);
 
-               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
-               if (unalignedCheckpoint) {
+               CheckpointBarrier barrier = 
parseCheckpointBarrier(bufferConsumer);
+               if (barrier != null) {
+                       checkState(
+                               
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+                               "Only unaligned checkpoints should be priority 
events");
                        final Iterator<BufferConsumer> iterator = 
buffers.iterator();
                        Iterators.advance(iterator, pos + 1);
+                       List<Buffer> inflightBuffers = new ArrayList<>();
                        while (iterator.hasNext()) {
                                BufferConsumer buffer = iterator.next();
 
                                if (buffer.isBuffer()) {
                                        try (BufferConsumer bc = buffer.copy()) 
{
-                                               
inflightBufferSnapshot.add(bc.build());
+                                               inflightBuffers.add(bc.build());
                                        }
                                }
                        }
+                       channelStateWriter.addOutputData(
+                               barrier.getId(),
+                               subpartitionInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               inflightBuffers.toArray(new Buffer[0]));

Review comment:
       Again I would point to the previous comment:
   https://github.com/apache/flink/pull/13228#discussion_r487038327
   
   ```
   Collection<Buffer> insertAsHeadAndGetInFlightData(checkpointBarrier)
   ```
   might be a better option. (It might not, as I haven't tried to implement it)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -89,17 +96,15 @@
        /** The number of available buffers that have not been announced to the 
producer yet. */
        private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
-       /**
-        * The latest already triggered checkpoint id which would be updated 
during
-        * {@link #spillInflightBuffers(long, ChannelStateWriter)}.
-        */
-       @GuardedBy("receivedBuffers")
-       private long lastRequestedCheckpointId = -1;
+       private final BufferManager bufferManager;
 
-       /** The current received checkpoint id from the network. */
-       private long receivedCheckpointId = -1;
+       /** Stores #overtaken buffers when a checkpoint barrier is received 
before task thread started checkpoint. */
+       @GuardedBy("receivedBuffers")
+       private Map<Long, Integer> numBuffersOvertaken = new HashMap<>();
 
-       private final BufferManager bufferManager;
+       /** All started checkpoints where a barrier has not been received yet. 
*/
+       @GuardedBy("receivedBuffers")
+       private Deque<Long> pendingCheckpointBarriers = new ArrayDeque<>(2);

Review comment:
       I don't think it's worth complicating the code with support for multiple 
concurrent checkpoints. It's not likely to be implemented soon, if ever.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -210,15 +221,25 @@ public void spillInflightBuffers(long checkpointId, 
ChannelStateWriter channelSt
                }
 
                Buffer buffer = next.buffer();
-               CheckpointBarrier notifyReceivedBarrier = 
parseCheckpointBarrierOrNull(buffer);
-               if (notifyReceivedBarrier != null) {
-                       receivedCheckpointId = notifyReceivedBarrier.getId();
-               } else if (receivedCheckpointId < lastRequestedCheckpointId && 
buffer.isBuffer()) {
-                       
inputGate.getBufferReceivedListener().notifyBufferReceived(buffer.retainBuffer(),
 channelInfo);
-               }
 
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
+               if (buffer.isBuffer()) {
+                       for (final long barrierId : pendingCheckpointBarriers) {
+                               channelStateWriter.addInputData(
+                                       barrierId,
+                                       getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));

Review comment:
       Are you spilling only on polling the buffer? (I think 
`RemoteInputChannel` is working better in this regard)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception {
                }
 
                InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+               updatePriorityAvailability();
                checkFinished(inputStatus, readingInputIndex);
                return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
        }
 
+       private void updatePriorityAvailability() {
+               if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) {
+                       final CompletableFuture<?> priorityEventAvailableFuture 
=
+                               
inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture();
+                       // no more priority events for the input
+                       if (!priorityEventAvailableFuture.isDone()) {
+                               
prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex);
+                               if 
(!prioritySelectionHandler.isAnyInputAvailable()) {
+                                       priorityAvailability.resetUnavailable();
+                               }
+                               
priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex));
+                       }
+               }
+       }
+
+       private Runnable onPriorityEvent(int index) {
+               return () -> {
+                       // set the priority flag in a mail before notifying 
StreamTask of availability
+                       mainMailboxExecutor.execute(() -> {
+                               
prioritySelectionHandler.setAvailableInput(index);
+                               
priorityAvailability.getUnavailableToResetAvailable().complete(null);

Review comment:
       Can not you maybe handle the priority message directly here, in this 
mail? Instead of relaying on the `processDefaultAction` to pick this up? 
   
   (I'm asking/loudly thinking)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -314,6 +315,21 @@ protected StreamTask(
                }
 
                this.channelIOExecutor = Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("channel-state-unspilling"));
+
+               injectChannelStateWriterIntoChannels();
+       }
+
+       private void injectChannelStateWriterIntoChannels() {
+               final Environment env = getEnvironment();
+               final ChannelStateWriter channelStateWriter = 
subtaskCheckpointCoordinator.getChannelStateWriter();
+               for (final InputGate gate : env.getAllInputGates()) {
+                       gate.setChannelStateWriter(channelStateWriter);
+               }
+               for (ResultPartitionWriter writer : env.getAllWriters()) {
+                       if (writer instanceof ChannelStateHolder) {
+                               ((ChannelStateHolder) 
writer).setChannelStateWriter(channelStateWriter);
+                       }
+               }

Review comment:
       ❤️ 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -22,13 +22,10 @@
 import org.apache.flink.annotation.VisibleForTesting;

Review comment:
       Is the code in a working state before:
   > Remove synchronization from CheckpointBarrierUnaligner.
   
   commit? It looks like data are spilled in two places, right?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
##########
@@ -122,8 +121,8 @@ public void close() throws Exception {
        }
 
        @Override
-       public void registerBufferReceivedListener(BufferReceivedListener 
listener) {
-               inputGate.registerBufferReceivedListener(listener);
+       public CompletableFuture<?> getPriorityEventAvailableFuture() {
+               return inputGate.getPriorityEventAvailableFuture();

Review comment:
       A minor rebasing/squashing mistake?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -159,10 +158,12 @@ public InputStatus emitNext(DataOutput<T> output) throws 
Exception {
                        if (bufferOrEvent.isPresent()) {
                                // return to the mailbox after receiving a 
checkpoint barrier to avoid processing of
                                // data after the barrier before checkpoint is 
performed for unaligned checkpoint mode
-                               if (bufferOrEvent.get().isEvent() && 
bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
+                               if (bufferOrEvent.get().isBuffer()) {
+                                       processBuffer(bufferOrEvent.get());
+                               } else {
+                                       processEvent(bufferOrEvent.get());

Review comment:
       Could you ether pull it to another commit or revert? This
   > Use futures to listen to priority events and handle them in 
StreamTaskNetworkInput.
   
   commit has a couple of other irrelevant changes

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
##########
@@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws 
Exception {
                inputGate = createBarrierBuffer(2, sequence, validator);

Review comment:
       Why has this test and `CheckpointBarrierTrackerTest.java` changed in 
this commit? Rebasing/squashing mistake, or am I missing something about this 
commit (I thought it's a pure clean up without functional changes).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
 
                                wasEmpty = receivedBuffers.isEmpty();
-                               receivedBuffers.add(buffer);
 
-                               if (listener != null && buffer.isBuffer() && 
receivedCheckpointId < lastRequestedCheckpointId) {
-                                       notifyReceivedBuffer = 
buffer.retainBuffer();
+                               AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                               if (priorityEvent != null) {
+                                       
receivedBuffers.addPriorityElement(buffer);
+                                       final int pos = 
receivedBuffers.getNumPriorityElements();
+                                       if (priorityEvent instanceof 
CheckpointBarrier) {
+                                               final long barrierId = 
((CheckpointBarrier) priorityEvent).getId();
+                                               // don't spill future buffers 
for this checkpoint
+                                               if 
(!pendingCheckpointBarriers.remove(barrierId)) {
+                                                       // checkpoint was not 
yet started by task thread,
+                                                       // so remember the 
numbers of buffers to spill for the time when it will be started
+                                                       
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+                                               }
+                                       }
+                                       firstPriorityEvent = pos == 1;
                                } else {
-                                       notifyReceivedBuffer = null;
+                                       receivedBuffers.add(buffer);
+                                       if (buffer.isBuffer()) {
+                                               for (final long checkpointId : 
pendingCheckpointBarriers) {
+                                                       
channelStateWriter.addInputData(
+                                                               checkpointId,
+                                                               channelInfo,
+                                                               sequenceNumber,
+                                                               
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                                               }
+                                       }
                                }
-                               notifyReceivedBarrier = listener != null ? 
parseCheckpointBarrierOrNull(buffer) : null;
                        }
                        recycleBuffer = false;
 
                        ++expectedSequenceNumber;
 
+                       if (firstPriorityEvent) {
+                               notifyPriorityEvent();
+                       }
                        if (wasEmpty) {
                                notifyChannelNonEmpty();
                        }
 
                        if (backlog >= 0) {
                                onSenderBacklog(backlog);
                        }
-
-                       if (notifyReceivedBarrier != null) {
-                               receivedCheckpointId = 
notifyReceivedBarrier.getId();
-                               if (notifyReceivedBarrier.isCheckpoint()) {
-                                       
listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
-                               }
-                       } else if (notifyReceivedBuffer != null) {
-                               
listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo);
-                       }
                } finally {
                        if (recycleBuffer) {
                                buffer.recycleBuffer();
                        }
                }
        }
 
+       /**
+        * Spills all queued buffers on checkpoint start. If barrier has 
already been received (and reordered), spill only
+        * the overtaken buffers.
+        */
+       public void checkpointStarted(CheckpointBarrier barrier) {
+               checkState(channelStateWriter != null, "Channel state writer 
not injected");
+               synchronized (receivedBuffers) {
+                       final Integer numBuffers = 
numBuffersOvertaken.get(barrier.getId());
+                       if (numBuffers != null) {
+                               // already received barrier before the task 
thread picked up the barrier of this or another channel
+                               spillBuffers(barrier.getId(), numBuffers);
+                       } else {
+                               // barrier not yet received, spill all current 
and future buffers
+                               spillBuffers(barrier.getId(), 
receivedBuffers.getNumUnprioritizedElements());
+                               pendingCheckpointBarriers.add(barrier.getId());
+                       }
+               }
+       }
+
+       public void checkpointStopped(long checkpointId) {
+               synchronized (receivedBuffers) {
+                       numBuffersOvertaken.remove(checkpointId);
+                       pendingCheckpointBarriers.remove(checkpointId);
+               }
+       }
+
+       private void spillBuffers(long checkpointId, int numBuffers) {

Review comment:
       nitty nit: `writeInFlightBuffers`? (`write` as Romand picked `writer` 
for the "spilling" nomenclature)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
 
                                wasEmpty = receivedBuffers.isEmpty();
-                               receivedBuffers.add(buffer);
 
-                               if (listener != null && buffer.isBuffer() && 
receivedCheckpointId < lastRequestedCheckpointId) {
-                                       notifyReceivedBuffer = 
buffer.retainBuffer();
+                               AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                               if (priorityEvent != null) {
+                                       
receivedBuffers.addPriorityElement(buffer);
+                                       final int pos = 
receivedBuffers.getNumPriorityElements();
+                                       if (priorityEvent instanceof 
CheckpointBarrier) {
+                                               final long barrierId = 
((CheckpointBarrier) priorityEvent).getId();
+                                               // don't spill future buffers 
for this checkpoint
+                                               if 
(!pendingCheckpointBarriers.remove(barrierId)) {
+                                                       // checkpoint was not 
yet started by task thread,
+                                                       // so remember the 
numbers of buffers to spill for the time when it will be started
+                                                       
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+                                               }
+                                       }
+                                       firstPriorityEvent = pos == 1;
                                } else {
-                                       notifyReceivedBuffer = null;
+                                       receivedBuffers.add(buffer);
+                                       if (buffer.isBuffer()) {
+                                               for (final long checkpointId : 
pendingCheckpointBarriers) {
+                                                       
channelStateWriter.addInputData(
+                                                               checkpointId,
+                                                               channelInfo,
+                                                               sequenceNumber,
+                                                               
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                                               }
+                                       }

Review comment:
       This method has grown too large.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to