zentol closed pull request #6658: [FLINK-10243][metrics] Make latency metrics 
granularity configurable
URL: https://github.com/apache/flink/pull/6658
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 02f4ceb162f..0c0b0dd2ffb 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -7,6 +7,11 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>metrics.latency.granularity</h5></td>
+            <td style="word-wrap: break-word;">"operator"</td>
+            <td>Defines the granularity of latency metrics. Accepted values 
are:<ul><li>single - Track latency without differentiating between sources and 
subtasks.</li><li>operator - Track latency while differentiating between 
sources, but not subtasks.</li><li>subtask - Track latency while 
differentiating between sources and subtasks.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>metrics.latency.history-size</h5></td>
             <td style="word-wrap: break-word;">128</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 336ead6e193..67444a5397c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * Configuration options for metrics and metric reporters.
@@ -112,6 +113,17 @@
                                " Disables latency tracking if set to 0 or a 
negative value. Enabling this feature can significantly" +
                                " impact the performance of the cluster.");
 
+       public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY =
+               key("metrics.latency.granularity")
+                       .defaultValue("operator")
+                       .withDescription(Description.builder()
+                               .text("Defines the granularity of latency 
metrics. Accepted values are:")
+                               .list(
+                                       text("single - Track latency without 
differentiating between sources and subtasks."),
+                                       text("operator - Track latency while 
differentiating between sources, but not subtasks."),
+                                       text("subtask - Track latency while 
differentiating between sources and subtasks."))
+                               .build());
+
        /** The number of measured latencies to maintain at each operator. */
        public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
                key("metrics.latency.history-size")
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f52168bd9b9..f3c22080ab7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,6 +72,7 @@
 
 import java.io.Closeable;
 import java.io.Serializable;
+import java.util.Locale;
 
 /**
  * Base class for all stream operators. Operators that contain a user function 
should extend the class
@@ -193,11 +194,33 @@ public void setup(StreamTask<?, ?> containingTask, 
StreamConfig config, Output<S
                                LOG.warn("{} has been set to a value equal or 
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
                                historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
                        }
+
+                       final String configuredGranularity = 
taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
+                       LatencyStats.Granularity granularity;
+                       try {
+                               granularity = 
LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
+                       } catch (IllegalArgumentException iae) {
+                               granularity = LatencyStats.Granularity.OPERATOR;
+                               LOG.warn(
+                                       "Configured value {} option for {} is 
invalid. Defaulting to {}.",
+                                       configuredGranularity,
+                                       
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
+                                       granularity);
+                       }
                        TaskManagerJobMetricGroup jobMetricGroup = 
this.metrics.parent().parent();
-                       this.latencyStats = new 
LatencyStats(jobMetricGroup.addGroup("latency"), historySize, 
container.getIndexInSubtaskGroup(), getOperatorID());
+                       this.latencyStats = new 
LatencyStats(jobMetricGroup.addGroup("latency"),
+                               historySize,
+                               container.getIndexInSubtaskGroup(),
+                               getOperatorID(),
+                               granularity);
                } catch (Exception e) {
                        LOG.warn("An error occurred while instantiating latency 
metrics.", e);
-                       this.latencyStats = new 
LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
 1, 0, new OperatorID());
+                       this.latencyStats = new LatencyStats(
+                               
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
+                               1,
+                               0,
+                               new OperatorID(),
+                               LatencyStats.Granularity.SINGLE);
                }
 
                this.runtimeContext = new StreamingRuntimeContext(this, 
environment, container.getAccumulatorMap());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
index 4f3d33ec6f9..926753dc78f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
@@ -34,23 +34,29 @@
        private final int historySize;
        private final int subtaskIndex;
        private final OperatorID operatorId;
+       private final Granularity granularity;
 
-       public LatencyStats(MetricGroup metricGroup, int historySize, int 
subtaskIndex, OperatorID operatorID) {
+       public LatencyStats(
+                       MetricGroup metricGroup,
+                       int historySize,
+                       int subtaskIndex,
+                       OperatorID operatorID,
+                       Granularity granularity) {
                this.metricGroup = metricGroup;
                this.historySize = historySize;
                this.subtaskIndex = subtaskIndex;
                this.operatorId = operatorID;
+               this.granularity = granularity;
        }
 
        public void reportLatency(LatencyMarker marker) {
-               String uniqueName =  "" + marker.getOperatorId() + 
marker.getSubtaskIndex() + operatorId + subtaskIndex;
+               final String uniqueName = 
granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);
+
                DescriptiveStatisticsHistogram latencyHistogram = 
this.latencyStats.get(uniqueName);
                if (latencyHistogram == null) {
                        latencyHistogram = new 
DescriptiveStatisticsHistogram(this.historySize);
                        this.latencyStats.put(uniqueName, latencyHistogram);
-                       this.metricGroup
-                               .addGroup("source_id", 
String.valueOf(marker.getOperatorId()))
-                               .addGroup("source_subtask_index", 
String.valueOf(marker.getSubtaskIndex()))
+                       granularity.createSourceMetricGroups(metricGroup, 
marker, operatorId, subtaskIndex)
                                .addGroup("operator_id", 
String.valueOf(operatorId))
                                .addGroup("operator_subtask_index", 
String.valueOf(subtaskIndex))
                                .histogram("latency", latencyHistogram);
@@ -59,4 +65,62 @@ public void reportLatency(LatencyMarker marker) {
                long now = System.currentTimeMillis();
                latencyHistogram.update(now - marker.getMarkedTime());
        }
+
+       /**
+        * Granularity for latency metrics.
+        */
+       public enum Granularity {
+               SINGLE {
+                       @Override
+                       String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex) {
+                               return String.valueOf(operatorId) + 
operatorSubtaskIndex;
+                       }
+
+                       @Override
+                       MetricGroup createSourceMetricGroups(
+                                       MetricGroup base,
+                                       LatencyMarker marker,
+                                       OperatorID operatorId,
+                                       int operatorSubtaskIndex) {
+                               return base;
+                       }
+               },
+               OPERATOR {
+                       @Override
+                       String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex) {
+                               return String.valueOf(marker.getOperatorId()) + 
operatorId + operatorSubtaskIndex;
+                       }
+
+                       @Override
+                       MetricGroup createSourceMetricGroups(
+                                       MetricGroup base,
+                                       LatencyMarker marker,
+                                       OperatorID operatorId,
+                                       int operatorSubtaskIndex) {
+                               return base
+                                       .addGroup("source_id", 
String.valueOf(marker.getOperatorId()));
+                       }
+               },
+               SUBTASK {
+                       @Override
+                       String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex) {
+                               return String.valueOf(marker.getOperatorId()) + 
marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
+                       }
+
+                       @Override
+                       MetricGroup createSourceMetricGroups(
+                                       MetricGroup base,
+                                       LatencyMarker marker,
+                                       OperatorID operatorId,
+                                       int operatorSubtaskIndex) {
+                               return base
+                                       .addGroup("source_id", 
String.valueOf(marker.getOperatorId()))
+                                       .addGroup("source_subtask_index", 
String.valueOf(marker.getSubtaskIndex()));
+                       }
+               };
+
+               abstract String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex);
+
+               abstract MetricGroup createSourceMetricGroups(MetricGroup base, 
LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
new file mode 100644
index 00000000000..ef14dcbb96b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Tests for the {@link LatencyStats}.
+ */
+public class LatencyStatsTest extends TestLogger {
+
+       private static final OperatorID OPERATOR_ID = new OperatorID();
+       private static final OperatorID SOURCE_ID_1 = new OperatorID();
+       private static final OperatorID SOURCE_ID_2 = new OperatorID();
+
+       private static final int OPERATOR_SUBTASK_INDEX = 64;
+
+       private static final String PARENT_GROUP_NAME = "parent";
+
+       @Test
+       public void testLatencyStatsSingle() {
+               testLatencyStats(LatencyStats.Granularity.SINGLE, registrations 
-> {
+                       Assert.assertEquals(1, registrations.size());
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(0);
+                               assertName(registration.f0);
+                               Assert.assertEquals(5, 
registration.f1.getCount());
+                       }
+               });
+       }
+
+       @Test
+       public void testLatencyStatsOperator() {
+               testLatencyStats(LatencyStats.Granularity.OPERATOR, 
registrations -> {
+                       Assert.assertEquals(2, registrations.size());
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(0);
+                               assertName(registration.f0, SOURCE_ID_1);
+                               Assert.assertEquals(3, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(1);
+                               assertName(registration.f0, SOURCE_ID_2);
+                               Assert.assertEquals(2, 
registration.f1.getCount());
+                       }
+               });
+       }
+
+       @Test
+       public void testLatencyStatsSubtask() {
+               testLatencyStats(LatencyStats.Granularity.SUBTASK, 
registrations -> {
+                       Assert.assertEquals(4, registrations.size());
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(0);
+                               assertName(registration.f0, SOURCE_ID_1, 0);
+                               Assert.assertEquals(2, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(1);
+                               assertName(registration.f0, SOURCE_ID_1, 1);
+                               Assert.assertEquals(1, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(2);
+                               assertName(registration.f0, SOURCE_ID_2, 2);
+                               Assert.assertEquals(1, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(3);
+                               assertName(registration.f0, SOURCE_ID_2, 3);
+                               Assert.assertEquals(1, 
registration.f1.getCount());
+                       }
+               });
+       }
+
+       private static void testLatencyStats(
+               final LatencyStats.Granularity granularity,
+               final Consumer<List<Tuple2<String, Histogram>>> verifier) {
+
+               final AbstractMetricGroup<?> dummyGroup = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+               final TestMetricRegistry registry = new TestMetricRegistry();
+               final MetricGroup parentGroup = new 
GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
+
+               final LatencyStats latencyStats = new LatencyStats(
+                       parentGroup,
+                       MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
+                       OPERATOR_SUBTASK_INDEX,
+                       OPERATOR_ID,
+                       granularity);
+
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 
0));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 
0));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 
1));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 
2));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 
3));
+
+               verifier.accept(registry.latencyHistograms);
+       }
+
+       /**
+        * Removes all parts from the metric identifier preceding the 
latency-related parts.
+        */
+       private static String sanitizeName(final String registrationName) {
+               return 
registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + 
PARENT_GROUP_NAME.length() + 1);
+       }
+
+       private static void assertName(final String registrationName) {
+               final String sanitizedName = sanitizeName(registrationName);
+               Assert.assertEquals("operator_id." + OPERATOR_ID +
+                       ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+                       ".latency", sanitizedName);
+       }
+
+       private static void assertName(final String registrationName, final 
OperatorID sourceId) {
+               final String sanitizedName = sanitizeName(registrationName);
+               Assert.assertEquals("source_id." + sourceId +
+                       ".operator_id." + OPERATOR_ID +
+                       ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+                       ".latency", sanitizedName);
+       }
+
+       private static void assertName(final String registrationName, final 
OperatorID sourceId, final int sourceIndex) {
+               final String sanitizedName = sanitizeName(registrationName);
+               Assert.assertEquals("source_id." + sourceId +
+                       ".source_subtask_index." + sourceIndex +
+                       ".operator_id." + OPERATOR_ID +
+                       ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+                       ".latency", sanitizedName);
+       }
+
+       private static class TestMetricRegistry implements MetricRegistry {
+
+               private final List<Tuple2<String, Histogram>> latencyHistograms 
= new ArrayList<>(4);
+
+               @Override
+               public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+                       if (metric instanceof Histogram) {
+                               
latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), 
(Histogram) metric));
+                       }
+               }
+
+               @Override
+               public char getDelimiter() {
+                       return '.';
+               }
+
+               @Override
+               public char getDelimiter(int index) {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberReporters() {
+                       return 0;
+               }
+
+               @Override
+               public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+
+               }
+
+               @Override
+               public ScopeFormats getScopeFormats() {
+                       return null;
+               }
+
+               @Nullable
+               @Override
+               public String getMetricQueryServicePath() {
+                       return null;
+               }
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to