zentol closed pull request #6658: [FLINK-10243][metrics] Make latency metrics granularity configurable URL: https://github.com/apache/flink/pull/6658
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 02f4ceb162f..0c0b0dd2ffb 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -7,6 +7,11 @@ </tr> </thead> <tbody> + <tr> + <td><h5>metrics.latency.granularity</h5></td> + <td style="word-wrap: break-word;">"operator"</td> + <td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td> + </tr> <tr> <td><h5>metrics.latency.history-size</h5></td> <td style="word-wrap: break-word;">128</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 336ead6e193..67444a5397c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.TextElement.text; /** * Configuration options for metrics and metric reporters. @@ -112,6 +113,17 @@ " Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" + " impact the performance of the cluster."); + public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY = + key("metrics.latency.granularity") + .defaultValue("operator") + .withDescription(Description.builder() + .text("Defines the granularity of latency metrics. Accepted values are:") + .list( + text("single - Track latency without differentiating between sources and subtasks."), + text("operator - Track latency while differentiating between sources, but not subtasks."), + text("subtask - Track latency while differentiating between sources and subtasks.")) + .build()); + /** The number of measured latencies to maintain at each operator. */ public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE = key("metrics.latency.history-size") diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index f52168bd9b9..f3c22080ab7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -72,6 +72,7 @@ import java.io.Closeable; import java.io.Serializable; +import java.util.Locale; /** * Base class for all stream operators. Operators that contain a user function should extend the class @@ -193,11 +194,33 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize); historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(); } + + final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY); + LatencyStats.Granularity granularity; + try { + granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException iae) { + granularity = LatencyStats.Granularity.OPERATOR; + LOG.warn( + "Configured value {} option for {} is invalid. Defaulting to {}.", + configuredGranularity, + MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), + granularity); + } TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent(); - this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID()); + this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), + historySize, + container.getIndexInSubtaskGroup(), + getOperatorID(), + granularity); } catch (Exception e) { LOG.warn("An error occurred while instantiating latency metrics.", e); - this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID()); + this.latencyStats = new LatencyStats( + UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), + 1, + 0, + new OperatorID(), + LatencyStats.Granularity.SINGLE); } this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java index 4f3d33ec6f9..926753dc78f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java @@ -34,23 +34,29 @@ private final int historySize; private final int subtaskIndex; private final OperatorID operatorId; + private final Granularity granularity; - public LatencyStats(MetricGroup metricGroup, int historySize, int subtaskIndex, OperatorID operatorID) { + public LatencyStats( + MetricGroup metricGroup, + int historySize, + int subtaskIndex, + OperatorID operatorID, + Granularity granularity) { this.metricGroup = metricGroup; this.historySize = historySize; this.subtaskIndex = subtaskIndex; this.operatorId = operatorID; + this.granularity = granularity; } public void reportLatency(LatencyMarker marker) { - String uniqueName = "" + marker.getOperatorId() + marker.getSubtaskIndex() + operatorId + subtaskIndex; + final String uniqueName = granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex); + DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName); if (latencyHistogram == null) { latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize); this.latencyStats.put(uniqueName, latencyHistogram); - this.metricGroup - .addGroup("source_id", String.valueOf(marker.getOperatorId())) - .addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex())) + granularity.createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex) .addGroup("operator_id", String.valueOf(operatorId)) .addGroup("operator_subtask_index", String.valueOf(subtaskIndex)) .histogram("latency", latencyHistogram); @@ -59,4 +65,62 @@ public void reportLatency(LatencyMarker marker) { long now = System.currentTimeMillis(); latencyHistogram.update(now - marker.getMarkedTime()); } + + /** + * Granularity for latency metrics. + */ + public enum Granularity { + SINGLE { + @Override + String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) { + return String.valueOf(operatorId) + operatorSubtaskIndex; + } + + @Override + MetricGroup createSourceMetricGroups( + MetricGroup base, + LatencyMarker marker, + OperatorID operatorId, + int operatorSubtaskIndex) { + return base; + } + }, + OPERATOR { + @Override + String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) { + return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex; + } + + @Override + MetricGroup createSourceMetricGroups( + MetricGroup base, + LatencyMarker marker, + OperatorID operatorId, + int operatorSubtaskIndex) { + return base + .addGroup("source_id", String.valueOf(marker.getOperatorId())); + } + }, + SUBTASK { + @Override + String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) { + return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex; + } + + @Override + MetricGroup createSourceMetricGroups( + MetricGroup base, + LatencyMarker marker, + OperatorID operatorId, + int operatorSubtaskIndex) { + return base + .addGroup("source_id", String.valueOf(marker.getOperatorId())) + .addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex())); + } + }; + + abstract String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex); + + abstract MetricGroup createSourceMetricGroups(MetricGroup base, LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java new file mode 100644 index 00000000000..ef14dcbb96b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.GenericMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * Tests for the {@link LatencyStats}. + */ +public class LatencyStatsTest extends TestLogger { + + private static final OperatorID OPERATOR_ID = new OperatorID(); + private static final OperatorID SOURCE_ID_1 = new OperatorID(); + private static final OperatorID SOURCE_ID_2 = new OperatorID(); + + private static final int OPERATOR_SUBTASK_INDEX = 64; + + private static final String PARENT_GROUP_NAME = "parent"; + + @Test + public void testLatencyStatsSingle() { + testLatencyStats(LatencyStats.Granularity.SINGLE, registrations -> { + Assert.assertEquals(1, registrations.size()); + + { + final Tuple2<String, Histogram> registration = registrations.get(0); + assertName(registration.f0); + Assert.assertEquals(5, registration.f1.getCount()); + } + }); + } + + @Test + public void testLatencyStatsOperator() { + testLatencyStats(LatencyStats.Granularity.OPERATOR, registrations -> { + Assert.assertEquals(2, registrations.size()); + + { + final Tuple2<String, Histogram> registration = registrations.get(0); + assertName(registration.f0, SOURCE_ID_1); + Assert.assertEquals(3, registration.f1.getCount()); + } + + { + final Tuple2<String, Histogram> registration = registrations.get(1); + assertName(registration.f0, SOURCE_ID_2); + Assert.assertEquals(2, registration.f1.getCount()); + } + }); + } + + @Test + public void testLatencyStatsSubtask() { + testLatencyStats(LatencyStats.Granularity.SUBTASK, registrations -> { + Assert.assertEquals(4, registrations.size()); + + { + final Tuple2<String, Histogram> registration = registrations.get(0); + assertName(registration.f0, SOURCE_ID_1, 0); + Assert.assertEquals(2, registration.f1.getCount()); + } + + { + final Tuple2<String, Histogram> registration = registrations.get(1); + assertName(registration.f0, SOURCE_ID_1, 1); + Assert.assertEquals(1, registration.f1.getCount()); + } + + { + final Tuple2<String, Histogram> registration = registrations.get(2); + assertName(registration.f0, SOURCE_ID_2, 2); + Assert.assertEquals(1, registration.f1.getCount()); + } + + { + final Tuple2<String, Histogram> registration = registrations.get(3); + assertName(registration.f0, SOURCE_ID_2, 3); + Assert.assertEquals(1, registration.f1.getCount()); + } + }); + } + + private static void testLatencyStats( + final LatencyStats.Granularity granularity, + final Consumer<List<Tuple2<String, Histogram>>> verifier) { + + final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + final TestMetricRegistry registry = new TestMetricRegistry(); + final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME); + + final LatencyStats latencyStats = new LatencyStats( + parentGroup, + MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(), + OPERATOR_SUBTASK_INDEX, + OPERATOR_ID, + granularity); + + latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0)); + latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0)); + latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1)); + latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2)); + latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3)); + + verifier.accept(registry.latencyHistograms); + } + + /** + * Removes all parts from the metric identifier preceding the latency-related parts. + */ + private static String sanitizeName(final String registrationName) { + return registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1); + } + + private static void assertName(final String registrationName) { + final String sanitizedName = sanitizeName(registrationName); + Assert.assertEquals("operator_id." + OPERATOR_ID + + ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX + + ".latency", sanitizedName); + } + + private static void assertName(final String registrationName, final OperatorID sourceId) { + final String sanitizedName = sanitizeName(registrationName); + Assert.assertEquals("source_id." + sourceId + + ".operator_id." + OPERATOR_ID + + ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX + + ".latency", sanitizedName); + } + + private static void assertName(final String registrationName, final OperatorID sourceId, final int sourceIndex) { + final String sanitizedName = sanitizeName(registrationName); + Assert.assertEquals("source_id." + sourceId + + ".source_subtask_index." + sourceIndex + + ".operator_id." + OPERATOR_ID + + ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX + + ".latency", sanitizedName); + } + + private static class TestMetricRegistry implements MetricRegistry { + + private final List<Tuple2<String, Histogram>> latencyHistograms = new ArrayList<>(4); + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + if (metric instanceof Histogram) { + latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), (Histogram) metric)); + } + } + + @Override + public char getDelimiter() { + return '.'; + } + + @Override + public char getDelimiter(int index) { + return 0; + } + + @Override + public int getNumberReporters() { + return 0; + } + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + + } + + @Override + public ScopeFormats getScopeFormats() { + return null; + } + + @Nullable + @Override + public String getMetricQueryServicePath() { + return null; + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services