[ https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897090#comment-15897090 ]
ASF GitHub Bot commented on FLINK-5090: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r104388593 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { + + private final SingleInputGate inputGate; + + private long lastTotal = -1; + + private int lastMin = -1; + + private int lastMax = -1; + + private float lastAvg = -1.0f; + + // ------------------------------------------------------------------------ + + private InputGateMetrics(SingleInputGate inputGate) { + this.inputGate = checkNotNull(inputGate); + } + + // ------------------------------------------------------------------------ + + // these methods are package private to make access from the nested classes faster + + long refreshAndGetTotal() { + long total; + if ((total = lastTotal) == -1) { + refresh(); + total = lastTotal; + } + + lastTotal = -1; + return total; + } + + int refreshAndGetMin() { + int min; + if ((min = lastMin) == -1) { + refresh(); + min = lastMin; + } + + lastMin = -1; + return min; + } + + int refreshAndGetMax() { + int max; + if ((max = lastMax) == -1) { + refresh(); + max = lastMax; + } + + lastMax = -1; + return max; + } + + float refreshAndGetAvg() { + float avg; + if ((avg = lastAvg) < 0.0f) { + refresh(); + avg = lastAvg; + } + + lastAvg = -1.0f; + return avg; + } + + private void refresh() { + long total = 0; + int min = Integer.MAX_VALUE; + int max = 0; + int count = 0; + + for (InputChannel channel : inputGate.getInputChannels().values()) { + if (channel.getClass() == RemoteInputChannel.class) { + RemoteInputChannel rc = (RemoteInputChannel) channel; + + int size = rc.unsynchronizedGetNumberOfQueuedBuffers(); + total += size; + min = Math.min(min, size); + max = Math.max(max, size); + count++; + } + } + + this.lastTotal = total; + this.lastMin = min; + this.lastMax = max; + this.lastAvg = total / (float) count; + } + + // ------------------------------------------------------------------------ + // Gauges to access the stats + // ------------------------------------------------------------------------ + + private Gauge<Long> getTotalQueueLenGauge() { + return new Gauge<Long>() { + @Override + public Long getValue() { + return refreshAndGetTotal(); + } + }; + } + + private Gauge<Integer> getMinQueueLenGauge() { + return new Gauge<Integer>() { + @Override + public Integer getValue() { + return refreshAndGetMin(); + } + }; + } + + private Gauge<Integer> getMaxQueueLenGauge() { + return new Gauge<Integer>() { + @Override + public Integer getValue() { + return refreshAndGetMax(); + } + }; + } + + private Gauge<Float> getAvgQueueLenGauge() { + return new Gauge<Float>() { + @Override + public Float getValue() { + return refreshAndGetAvg(); + } + }; + } + + // ------------------------------------------------------------------------ + // Static access + // ------------------------------------------------------------------------ + + public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) { + InputGateMetrics metrics = new InputGateMetrics(gate); + + group.gauge("total-queue-len", metrics.getTotalQueueLenGauge()); --- End diff -- These metric names aren't consistent with other metrics; they should be named ```totalQueueLen``` or similar. > Expose optionally detailed metrics about network queue lengths > -------------------------------------------------------------- > > Key: FLINK-5090 > URL: https://issues.apache.org/jira/browse/FLINK-5090 > Project: Flink > Issue Type: New Feature > Components: Metrics, Network > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > > For debugging purposes, it is important to have access to more detailed > metrics about the length of network input and output queues. -- This message was sent by Atlassian JIRA (v6.3.15#6346)