[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15465112#comment-15465112 ]
ASF GitHub Bot commented on FLINK-4389: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77528207 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; + +/** + * Utility class for the serialization of metrics. + */ +public class MetricDumpSerialization { + private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class); + + private MetricDumpSerialization() { + } + + // ===== Serialization ============================================================================================= + public static class MetricDumpSerializer { + private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + private DataOutputStream dos = new DataOutputStream(baos); + + /** + * Serializes the given metrics and returns the resulting byte array. + * + * @param counters counters to serialize + * @param gauges gauges to serialize + * @param histograms histograms to serialize + * @return byte array containing the serialized metrics + * @throws IOException + */ + public byte[] serialize(Map<Counter, Tuple2<QueryScopeInfo, String>> counters, Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges, Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms) throws IOException { + baos.reset(); + dos.writeInt(counters.size()); + dos.writeInt(gauges.size()); + dos.writeInt(histograms.size()); + + for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeCounter(dos, entry.getKey()); + } + + for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeGauge(dos, entry.getKey()); + } + + for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeHistogram(dos, entry.getKey()); + } + return baos.toByteArray(); + } + + public void close() { + try { + dos.close(); + } catch (Exception e) { + LOG.debug("Failed to close OutputStream.", e); + } + try { + baos.close(); + } catch (Exception e) { + LOG.debug("Failed to close OutputStream.", e); + } + } + } + + private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException { + serializeString(dos, info.scope); + dos.writeByte(info.getCategory()); + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + break; + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + serializeString(dos, tmID); + break; + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + serializeString(dos, jobInfo.jobID); + break; + case INFO_CATEGORY_TASK: + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; + serializeString(dos, taskInfo.jobID); + serializeString(dos, taskInfo.vertexID); + dos.writeInt(taskInfo.subtaskIndex); + break; + case INFO_CATEGORY_OPERATOR: + QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; + serializeString(dos, operatorInfo.jobID); + serializeString(dos, operatorInfo.vertexID); + dos.writeInt(operatorInfo.subtaskIndex); + serializeString(dos, operatorInfo.operatorName); + break; + } + } + + private static void serializeString(DataOutputStream dos, String string) throws IOException { + byte[] bytes = string.getBytes(); + dos.writeInt(bytes.length); + dos.write(bytes); + } + + private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException { + dos.writeLong(counter.getCount()); + } + + private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException { + serializeString(dos, gauge.getValue().toString()); + } + + private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException { + HistogramStatistics stat = histogram.getStatistics(); + + dos.writeLong(stat.getMin()); + dos.writeLong(stat.getMax()); + dos.writeDouble(stat.getMean()); + dos.writeDouble(stat.getQuantile(0.5)); + dos.writeDouble(stat.getStdDev()); + dos.writeDouble(stat.getQuantile(0.75)); + dos.writeDouble(stat.getQuantile(0.90)); + dos.writeDouble(stat.getQuantile(0.95)); + dos.writeDouble(stat.getQuantile(0.98)); + dos.writeDouble(stat.getQuantile(0.99)); + dos.writeDouble(stat.getQuantile(0.999)); + } + + // ===== Deserialization =========================================================================================== --- End diff -- Or the dashes replaced with `=` > Expose metrics to Webfrontend > ----------------------------- > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend > Affects Versions: 1.1.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)