AHeise commented on a change in pull request #13562:
URL: https://github.com/apache/flink/pull/13562#discussion_r501721584



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -76,7 +76,7 @@
 
        private final CachingCheckpointStorageWorkerView checkpointStorage;
        private final String taskName;
-       private final ExecutorService executorService;

Review comment:
       Is that change really necessary? Javadoc of `Executor` could explicitly 
states that this is thread-independent:
   
   > An object that executes submitted Runnable tasks. This interface provides 
a way of decoupling task submission from the mechanics of how each task will be 
run, including details of thread use, scheduling, etc. An Executor is normally 
used instead of explicitly creating threads
   
   So I'd say it's async by nature.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -90,6 +90,11 @@ public boolean isRunning() {
 
        @Override
        public void run() {
+               final long asyncStartNanos = System.nanoTime();
+               final long asyncStartDelayMillis = (asyncStartNanos - 
asyncConstructionNanos) / 1_000_000L;
+               LOG.debug("{} - started executing asynchronous part of 
checkpoint {}. Asynchronous start delay: {} ms",

Review comment:
       👍 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -720,54 +720,17 @@ private CheckpointBarrier buildCheckpointBarrier(long id) 
{
        /**
         * The invokable handler used for triggering checkpoint and validation.
         */
-       private class ValidatingCheckpointHandler extends AbstractInvokable {
-
-               private long nextExpectedCheckpointId;
-
-               private long lastCanceledCheckpointId;
+       static class ValidatingCheckpointHandler extends 
CheckpointBarrierAlignerTest.ValidatingCheckpointHandler {

Review comment:
       Maybe it should be put in top level and used by both tests? (So just one 
handler instead of 2)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetricsBuilder.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * A builder for {@link CheckpointMetrics}.
+ *
+ * <p>This class is not thread safe, but parts of it can actually be used from 
different threads.
+ */
+@NotThreadSafe
+public class CheckpointMetricsBuilder {

Review comment:
       👍 to using builder pattern.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
##########
@@ -82,7 +82,9 @@ public void processBarrier(CheckpointBarrier receivedBarrier, 
InputChannelInfo c
 
                // fast path for single channel trackers
                if (totalNumberOfInputChannels == 1) {
-                       notifyCheckpoint(receivedBarrier, 0);
+                       markCheckpointStart(receivedBarrier.getTimestamp());
+                       markAlignmentEnd(0);

Review comment:
       This looks suboptimal: `markCheckpointStart` calls `resetAlignment` 
which calls `markAlignmentEnd(0)`. Then we call it again, but this time, it's 
completely different behavior.
   
   Maybe we need:
   * markCheckpointStart
   * markAlignmentStart (not for tracker just for (Un)Aligner))
   * markAlignmentEnd (just for (Un)Aligner))
   * setAlignmentDuration (for all)

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTest.java
##########
@@ -913,7 +915,8 @@ private static void validateAlignmentTime(long 
alignmentStartTimestamp, Checkpoi
                protected long nextExpectedCheckpointId;
                protected long triggeredCheckpointCounter = 0;
                protected long abortedCheckpointCounter = 0;
-               private CompletableFuture<Long> lastAlignmentDurationNanos;
+               protected CompletableFuture<Long> lastAlignmentDurationNanos;
+               protected List<Long> triggeredCheckpoints = new ArrayList<>();
 
                public ValidatingCheckpointHandler() {

Review comment:
       I second my suggestion to pull it to top-level.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -97,8 +104,13 @@ private void processPriorityEvents() throws IOException, 
InterruptedException {
 
        private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor 
mailboxExecutor) {
                final CompletableFuture<?> priorityEventAvailableFuture = 
inputGate.getPriorityEventAvailableFuture();
-               priorityEventAvailableFuture.thenRun(() ->
-                       mailboxExecutor.execute(this::processPriorityEvents, 
"process priority event @ gate %s", inputGate));
+               
FutureUtils.assertNoException(priorityEventAvailableFuture.thenRun(() -> {
+                       try {
+                               
mailboxExecutor.execute(this::processPriorityEvents, "process priority event @ 
gate %s", inputGate);
+                       } catch (RejectedExecutionException ex) {
+                               LOG.info("Ignored RejectedExecutionException in 
CheckpointedInputGate.waitForPriorityEvents");

Review comment:
       I'd go with debug

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -33,6 +33,7 @@
 import java.util.concurrent.CompletableFuture;

Review comment:
       commit message: [task/network]?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTest.java
##########
@@ -863,19 +860,11 @@ private static BufferOrEvent 
createCancellationBarrier(long checkpointId, int ch
 
        private static BufferOrEvent createBuffer(int channel) {
                final int size = sizeCounter++;
-               byte[] bytes = new byte[size];
-               RND.nextBytes(bytes);
-
-               MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
-               memory.put(0, bytes);
-
-               Buffer buf = new NetworkBuffer(memory, 
FreeingBufferRecycler.INSTANCE);
-               buf.setSize(size);
-
-               // retain an additional time so it does not get disposed after 
being read by the input gate
-               buf.retainBuffer();
+               return createBuffer(channel, size);

Review comment:
       👍 . Should also make debugging easier as size ~ sequence number.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -97,8 +104,13 @@ private void processPriorityEvents() throws IOException, 
InterruptedException {
 
        private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor 
mailboxExecutor) {
                final CompletableFuture<?> priorityEventAvailableFuture = 
inputGate.getPriorityEventAvailableFuture();
-               priorityEventAvailableFuture.thenRun(() ->
-                       mailboxExecutor.execute(this::processPriorityEvents, 
"process priority event @ gate %s", inputGate));
+               
FutureUtils.assertNoException(priorityEventAvailableFuture.thenRun(() -> {

Review comment:
       static import for consistency?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -75,7 +86,11 @@ public void close() throws IOException {
        public abstract long getLatestCheckpointId();
 
        public long getAlignmentDurationNanos() {
-               return 0;
+               if (startOfAlignmentTimestamp <= 0) {
+                       return 
FutureUtils.getOrDefault(latestAlignmentDurationNanos, 0L);

Review comment:
       Potential NPE (fixed with proper `getOrDefault`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
##########
@@ -31,68 +31,49 @@
        private static final long serialVersionUID = 1L;

Review comment:
       `@ThreadSafe`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -268,4 +271,22 @@ private void logFailedCleanupAttempt() {
                        checkpointMetaData.getCheckpointId());
        }
 
+       /**
+        * Helper class that allows to calculate metrics in the task and 
update/report them in
+        * {@link AsyncCheckpointRunnable}. For example it allows to update 
{@link CheckpointMetrics}

Review comment:
       CheckpointMetrics -> CheckpointMetricsBuilder?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -236,25 +235,32 @@ public void testMetrics() throws Exception {
 
                Thread.sleep(sleepTime);
 
+               long alignmentStartNanos = System.nanoTime();
+
                addSequence(inputGate,
                        createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize),
                        createBarrier(checkpointId, 1, 
checkpointBarrierCreation),
                        createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize),
                        createBarrier(checkpointId, 0),
                        createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize));
 
-               long startDelay = System.currentTimeMillis() - 
checkpointBarrierCreation;

Review comment:
       Why is that moved down?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetricsBuilder.java
##########
@@ -69,8 +86,9 @@ public long getCheckpointStartDelayNanos() {
        }
 
        public CheckpointMetrics build() {
+               checkState(alignmentDurationNanos.isDone());
                return new CheckpointMetrics(
-                       alignmentDurationNanos,
+                       FutureUtils.getWithoutException(alignmentDurationNanos),

Review comment:
       Isn't that potentially swallowing exceptions again?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -1054,6 +1054,16 @@ public void onComplete(Throwable failure, U success) {
                return null;
        }
 
+       /**
+        * @return the result of completable future, or the defaultValue if it 
has not yet completed.
+        */
+       public static <T> T getOrDefault(CompletableFuture<T> future, T 
defaultValue) {
+               if (future.isDone()) {
+                       return getWithoutException(future);

Review comment:
       Again this swallows exception and if it does you get `null` instead of 
the default.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
##########
@@ -347,9 +365,119 @@ public void testInterleavedCancellationBarriers() throws 
Exception {
                }
        }
 
+       @Test
+       public void testMetrics() throws Exception {
+               List<BufferOrEvent> output = new ArrayList<>();
+               ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
+               int numberOfChannels = 3;
+               inputGate = createCheckpointedInputGate(numberOfChannels, 
handler);
+               int[] sequenceNumbers = new int[numberOfChannels];
+
+               int bufferSize = 100;
+               long checkpointId = 1;
+               long sleepTime = 10;
+
+               long checkpointBarrierCreation = System.currentTimeMillis();
+               long alignmentStartNanos = System.nanoTime();
+
+               Thread.sleep(sleepTime);
+
+               addSequence(
+                       inputGate,
+                       output,
+                       sequenceNumbers,
+                       createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize),
+                       createBarrier(checkpointId, 1, 
checkpointBarrierCreation),
+                       createBuffer(0, bufferSize), createBuffer(2, 
bufferSize),
+                       createBarrier(checkpointId, 0),
+                       createBuffer(2, bufferSize));
+
+               Thread.sleep(sleepTime);
+
+               addSequence(inputGate,
+                       output,
+                       sequenceNumbers,
+                       createBarrier(checkpointId, 2),
+                       createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize),
+                       createEndOfPartition(0), createEndOfPartition(1), 
createEndOfPartition(2));
+
+               long startDelay = System.currentTimeMillis() - 
checkpointBarrierCreation;
+               long alignmentDuration = System.nanoTime() - 
alignmentStartNanos;
+
+               assertThat(inputGate.getCheckpointStartDelayNanos() / 
1_000_000, Matchers.greaterThanOrEqualTo(sleepTime));
+               assertThat(inputGate.getCheckpointStartDelayNanos() / 
1_000_000, Matchers.lessThanOrEqualTo(startDelay));
+
+               assertTrue(handler.getLastAlignmentDurationNanos().isDone());
+               assertThat(handler.getLastAlignmentDurationNanos().get() / 
1_000_000, Matchers.greaterThanOrEqualTo(sleepTime));

Review comment:
       Are you really sure that you this changed semantics of alignment in 
tracker is correct? Afaik it used to 0 always.
   If yes, I'd expect some documentation/changelog to go with that (haven't 
checked later commits yet). Also please add that to the commit message.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -106,13 +121,38 @@ protected void notifyAbortOnCancellationBarrier(long 
checkpointId) throws IOExce
        }
 
        protected void notifyAbort(long checkpointId, CheckpointException 
cause) throws IOException {
+               resetAlignment();
                toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, 
cause);
        }
 
        protected void markCheckpointStart(long checkpointCreationTimestamp) {
                latestCheckpointStartDelayNanos = 1_000_000 * Math.max(
                        0,
                        System.currentTimeMillis() - 
checkpointCreationTimestamp);
+
+               resetAlignment();
+               startOfAlignmentTimestamp = System.nanoTime();
+       }
+
+       protected void markAlignmentEnd() {
+               if (startOfAlignmentTimestamp > 0) {
+                       markAlignmentEnd(System.nanoTime() - 
startOfAlignmentTimestamp);
+               }
+               else {
+                       markAlignmentEnd(0);
+               }
+       }
+
+       protected void markAlignmentEnd(long alignmentDuration) {
+               if (!latestAlignmentDurationNanos.isDone()) {
+                       
latestAlignmentDurationNanos.complete(alignmentDuration);
+               }
+               startOfAlignmentTimestamp = 0;

Review comment:
       Extract constant and use it for field initialization and comparisons.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -75,7 +86,11 @@ public void close() throws IOException {
        public abstract long getLatestCheckpointId();
 
        public long getAlignmentDurationNanos() {
-               return 0;
+               if (startOfAlignmentTimestamp <= 0) {

Review comment:
       `System.nanoTime` could potentially return negative values and the 
difference is still plausible. I'd probably just use a specific tag value (no 
need to encode unset as `null`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetricsBuilder.java
##########
@@ -92,9 +108,11 @@ public long getCheckpointStartDelayNanos() {
 
        public CheckpointMetrics build() {
                checkState(alignmentDurationNanos.isDone());
+               checkState(bytesProcessedDuringAlignment.isDone());
                return new CheckpointMetrics(
+                       getWithoutException(bytesProcessedDuringAlignment),
                        bytesPersistedDuringAlignment,
-                       FutureUtils.getWithoutException(alignmentDurationNanos),

Review comment:
       nit: already use static import in original commit.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
##########
@@ -122,6 +122,8 @@ public void processBarrier(CheckpointBarrier 
receivedBarrier, InputChannelInfo c
                        if (barrierId > currentCheckpointId) {
                                // new checkpoint
                                currentCheckpointId = barrierId;
+                               
markCheckpointStart(receivedBarrier.getTimestamp());

Review comment:
       Is the previous commit even "correct" without this change? It feels that 
they should be squashed.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -106,13 +121,38 @@ protected void notifyAbortOnCancellationBarrier(long 
checkpointId) throws IOExce
        }
 
        protected void notifyAbort(long checkpointId, CheckpointException 
cause) throws IOException {
+               resetAlignment();
                toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, 
cause);
        }
 
        protected void markCheckpointStart(long checkpointCreationTimestamp) {
                latestCheckpointStartDelayNanos = 1_000_000 * Math.max(
                        0,
                        System.currentTimeMillis() - 
checkpointCreationTimestamp);
+
+               resetAlignment();
+               startOfAlignmentTimestamp = System.nanoTime();
+       }
+
+       protected void markAlignmentEnd() {
+               if (startOfAlignmentTimestamp > 0) {
+                       markAlignmentEnd(System.nanoTime() - 
startOfAlignmentTimestamp);
+               }
+               else {
+                       markAlignmentEnd(0);
+               }
+       }
+
+       protected void markAlignmentEnd(long alignmentDuration) {

Review comment:
       This method is odd: It doesn't really fit to `markCheckpointStart` 
(`markAlignmentEnd()` does). Maybe it should simply be called 
`setAlignmentDuration`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -200,6 +214,8 @@ public boolean equals(Object o) {
                        stateSize == that.stateSize &&
                        duration == that.duration &&
                        alignmentBuffered == that.alignmentBuffered &&
+                       Objects.equals(processedData, that.processedData) &&

Review comment:
       atypical to see equals be used for longs.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
##########
@@ -347,9 +365,119 @@ public void testInterleavedCancellationBarriers() throws 
Exception {
                }
        }
 
+       @Test
+       public void testMetrics() throws Exception {
+               List<BufferOrEvent> output = new ArrayList<>();
+               ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
+               int numberOfChannels = 3;
+               inputGate = createCheckpointedInputGate(numberOfChannels, 
handler);
+               int[] sequenceNumbers = new int[numberOfChannels];
+
+               int bufferSize = 100;
+               long checkpointId = 1;
+               long sleepTime = 10;
+
+               long checkpointBarrierCreation = System.currentTimeMillis();
+               long alignmentStartNanos = System.nanoTime();
+
+               Thread.sleep(sleepTime);
+
+               addSequence(
+                       inputGate,
+                       output,
+                       sequenceNumbers,
+                       createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize),
+                       createBarrier(checkpointId, 1, 
checkpointBarrierCreation),
+                       createBuffer(0, bufferSize), createBuffer(2, 
bufferSize),
+                       createBarrier(checkpointId, 0),
+                       createBuffer(2, bufferSize));
+
+               Thread.sleep(sleepTime);
+
+               addSequence(inputGate,
+                       output,
+                       sequenceNumbers,
+                       createBarrier(checkpointId, 2),
+                       createBuffer(0, bufferSize), createBuffer(1, 
bufferSize), createBuffer(2, bufferSize),
+                       createEndOfPartition(0), createEndOfPartition(1), 
createEndOfPartition(2));
+
+               long startDelay = System.currentTimeMillis() - 
checkpointBarrierCreation;
+               long alignmentDuration = System.nanoTime() - 
alignmentStartNanos;
+
+               assertThat(inputGate.getCheckpointStartDelayNanos() / 
1_000_000, Matchers.greaterThanOrEqualTo(sleepTime));
+               assertThat(inputGate.getCheckpointStartDelayNanos() / 
1_000_000, Matchers.lessThanOrEqualTo(startDelay));

Review comment:
       
is(both(greaterThanOrEqualTo(sleepTime)).and(lessThanOrEqualTo(startDelay))) to 
show that it's a range?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
##########
@@ -56,39 +72,4 @@ public void testSimpleAccess() throws Exception {
                assertEquals(0, stats.getEndToEndDuration(ackTimestamp + 1));
        }
 
-       /**
-        * Tests that the snapshot is actually serializable.
-        */
-       @Test
-       public void testIsJavaSerializable() throws Exception {

Review comment:
       should probably be a separate commit by your standards, but this PR 
already has enough commits for me ;)

##########
File path: flink-runtime-web/web-dashboard/proxy.conf.json
##########
@@ -1,7 +1,7 @@
 [
   {
     "context": ["/"],
-    "target": "http://localhost:8081";,
+    "target": "http://localhost:12345";,

Review comment:
       Revert

##########
File path: docs/monitoring/metrics.md
##########
@@ -1354,7 +1354,7 @@ Metrics related to data exchange between task executors 
using netty network comm
     <tr>
       <th rowspan="2"><strong>Task</strong></th>
       <td>checkpointAlignmentTime</td>
-      <td>The time in nanoseconds that the last barrier alignment took to 
complete, or how long the current alignment has taken so far (in 
nanoseconds).</td>
+      <td>The time in nanoseconds that the last barrier alignment took to 
complete, or how long the current alignment has taken so far (in nanoseconds). 
This is the time between receiving first and the last checkpoint barrier. You 
can find more information in the [Monitoring State and Checkpoints section]({{ 
site.baseurl 
}}/ops/state/large_state_tuning.html#monitoring-state-and-checkpoints)</td>

Review comment:
       Aren't all the other new metrics missing?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
##########
@@ -68,6 +68,8 @@ private MetricNames() {
        public static final String CHECKPOINT_ALIGNMENT_TIME = 
"checkpointAlignmentTime";
        public static final String CHECKPOINT_START_DELAY_TIME = 
"checkpointStartDelayNanos";
 

Review comment:
       nit: no newline?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -104,12 +104,19 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
                        for (final CheckpointableInput input : inputs) {
                                input.checkpointStarted(barrier);
                        }
-                       notifyCheckpoint(barrier, 0);
+                       notifyCheckpoint(barrier);
                }
                if (currentCheckpointId == barrierId) {
                        LOG.debug("{}: Received barrier from channel {} @ {}.", 
taskName, channelInfo, barrierId);
 
                        if (++numBarriersReceived == numOpenChannels) {
+                               if (getNumOpenChannels() > 1) {
+                                       markAlignmentEnd();
+                               }
+                               else {
+                                       // Make sure to report 0 for single 
channel case
+                                       markAlignmentEnd(0);

Review comment:
       While this is technically the correct way, it might be overengineered. 
I'm wonder what impact a few ns alignment has? Don't we render ms anyways? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -106,13 +121,38 @@ protected void notifyAbortOnCancellationBarrier(long 
checkpointId) throws IOExce
        }
 
        protected void notifyAbort(long checkpointId, CheckpointException 
cause) throws IOException {
+               resetAlignment();
                toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, 
cause);
        }
 
        protected void markCheckpointStart(long checkpointCreationTimestamp) {
                latestCheckpointStartDelayNanos = 1_000_000 * Math.max(
                        0,
                        System.currentTimeMillis() - 
checkpointCreationTimestamp);
+
+               resetAlignment();
+               startOfAlignmentTimestamp = System.nanoTime();
+       }
+
+       protected void markAlignmentEnd() {
+               if (startOfAlignmentTimestamp > 0) {

Review comment:
       again could be negative.

##########
File path: 
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
##########
@@ -128,7 +128,7 @@
           <th><strong>Latest Acknowledgement</strong></th>
           <th><strong>End to End Duration</strong></th>
           <th><strong>Checkpointed Data Size</strong></th>
-          <th><strong>Buffered During Alignment</strong></th>
+          <th><strong>Processed (persisted) in-flight data</strong></th>

Review comment:
       It seems `in-flight` is not as common as I (we?) thought. It caused 
issues in the last blog post. I wonder if we can simply drop it.
   I'm also wondering if it should be more explicit: `Processed (persisted) 
data during alignment`. Alternatively, have you thought about simply calling it 
`Received data during alignment`?

##########
File path: docs/monitoring/checkpoint_monitoring.md
##########
@@ -62,7 +62,19 @@ The checkpoint history keeps statistics about recently 
triggered checkpoints, in
 - **Latest Acknowledgement**: The time when the latest acknowledgement for any 
subtask was received at the JobManager (or n/a if no acknowledgement received 
yet).
 - **End to End Duration**: The duration from the trigger timestamp until the 
latest acknowledgement (or n/a if no acknowledgement received yet). This end to 
end duration for a complete checkpoint is determined by the last subtask that 
acknowledges the checkpoint. This time is usually larger than single subtasks 
need to actually checkpoint the state.
 - **Checkpointed Data Size**: The checkpointed data size over all acknowledged 
subtasks. If incremental checkpointing is enabled this value is the 
checkpointed data size delta.
-- **Buffered During Alignment**: The number of bytes buffered during alignment 
over all acknowledged subtasks. This is only > 0 if a stream alignment takes 
place during checkpointing. If the checkpointing mode is `AT_LEAST_ONCE` this 
will always be zero as at least once mode does not require stream alignment.
+- **Processed in-flight data**: The approximate number of bytes processed 
during the alignment (time between receiving the first and the last checkpoint 
barrier) over all acknowledged subtasks. This value might be not accurate, as 
it's currently measured on the network level, ignoring the data buffered in the 
records deserialiser.

Review comment:
       > This value might be not accurate, as it's currently measured on the 
network level, ignoring the data buffered in the records deserialiser.
   I'd assume it's way to technical. Just leave out; you said `approximate` 
initially.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -121,11 +123,15 @@ public void run() {
                                
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                                        operatorID,
                                        finalizedSnapshots.getTaskLocalState());
+
+                               bytesPersistedDuringAlignment += 
finalizedSnapshots.getJobManagerOwnedState().getResultSubpartitionState().getStateSize();
+                               bytesPersistedDuringAlignment += 
finalizedSnapshots.getJobManagerOwnedState().getInputChannelState().getStateSize();
                        }
 
                        final long asyncEndNanos = System.nanoTime();
                        final long asyncDurationMillis = (asyncEndNanos - 
asyncConstructionNanos) / 1_000_000L;
 
+                       
checkpointMetrics.setBytesPersistedDuringAlignment(bytesPersistedDuringAlignment);

Review comment:
       Just to double-check, that should also account mini state that is stored 
into `_metadata` right?

##########
File path: docs/monitoring/metrics.md
##########
@@ -1354,7 +1354,7 @@ Metrics related to data exchange between task executors 
using netty network comm
     <tr>
       <th rowspan="2"><strong>Task</strong></th>
       <td>checkpointAlignmentTime</td>
-      <td>The time in nanoseconds that the last barrier alignment took to 
complete, or how long the current alignment has taken so far (in 
nanoseconds).</td>
+      <td>The time in nanoseconds that the last barrier alignment took to 
complete, or how long the current alignment has taken so far (in nanoseconds). 
This is the time between receiving first and the last checkpoint barrier. You 
can find more information in the [Monitoring State and Checkpoints section]({{ 
site.baseurl 
}}/ops/state/large_state_tuning.html#monitoring-state-and-checkpoints)</td>
       <td>Gauge</td>

Review comment:
       You also need to update the REST page :/.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -56,6 +56,13 @@
        /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started. */
        private long startOfAlignmentTimestamp;
 
+       /**
+        * Cumulative counter of bytes processed during alignment. Once we 
complete alignment, we will
+        * put this value into the {@link #latestBytesProcessedDuringAlignment}.
+        */
+       private long bytesProcessedDuringAlignment;
+       private CompletableFuture<Long> latestBytesProcessedDuringAlignment = 
new CompletableFuture<>();

Review comment:
       At this point, I was wondering if we really need all futures? At the 
point where we are calling `metrics#build`, we are assuming everything is done 
anyways. Wouldn't it be enough to just `sync(builder)` when setting the final 
values?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
##########
@@ -56,6 +56,13 @@
        /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started. */
        private long startOfAlignmentTimestamp;
 
+       /**
+        * Cumulative counter of bytes processed during alignment. Once we 
complete alignment, we will
+        * put this value into the {@link #latestBytesProcessedDuringAlignment}.
+        */
+       private long bytesProcessedDuringAlignment;
+       private CompletableFuture<Long> latestBytesProcessedDuringAlignment = 
new CompletableFuture<>();

Review comment:
       We could even pull up `Unaligner#allBarriersReceivedFuture` to have a 
future to listen to and at which time we can expect the handler to have updated 
all metrics.

##########
File path: 
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
##########
@@ -151,7 +151,7 @@
             <td *ngIf="checkpoint['end_to_end_duration'] >= 0">{{ 
checkpoint['end_to_end_duration'] | humanizeDuration}}</td>
             <td *ngIf="checkpoint['end_to_end_duration'] <0">n/a</td>
             <td>{{ checkpoint['state_size'] | humanizeBytes }}</td>
-            <td>{{ checkpoint['alignment_buffered'] | humanizeBytes }}</td>
+            <td>{{ checkpoint['processed_data'] | humanizeBytes }} ({{ 
checkpoint['persisted_data'] | humanizeBytes }})</td>

Review comment:
       How much space have we wasted already? Could we simply add two columns? 
One of them would only be 0B but it would make things more explicit.

##########
File path: 
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
##########
@@ -151,7 +151,7 @@
             <td *ngIf="checkpoint['end_to_end_duration'] >= 0">{{ 
checkpoint['end_to_end_duration'] | humanizeDuration}}</td>
             <td *ngIf="checkpoint['end_to_end_duration'] <0">n/a</td>
             <td>{{ checkpoint['state_size'] | humanizeBytes }}</td>
-            <td>{{ checkpoint['alignment_buffered'] | humanizeBytes }}</td>
+            <td>{{ checkpoint['processed_data'] | humanizeBytes }} ({{ 
checkpoint['persisted_data'] | humanizeBytes }})</td>

Review comment:
       How will that render? Is one empty? Shouldn't it be `0 B`? So would the 
result look like `0 B 123 KB`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to