[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15978119#comment-15978119 ]
ASF GitHub Bot commented on FLINK-6013: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112613733 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>(); + private final Map<Counter, DCounter> counters = new ConcurrentHashMap<>(); + private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List<String> configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName); + + List<String> tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric instanceof Histogram) { + // No Histogram is registered + } else { + LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + + @Override + public void open(MetricConfig config) { + client = new DatadogHttpClient(config.getString(API_KEY, null)); + LOGGER.info("Configured DatadogHttpReporter"); + + configTags = getTagsFromConfig(config.getString(TAGS, "")); + } + + @Override + public void close() { + client.close(); + LOGGER.info("Shut down DatadogHttpReporter"); + } + + @Override + public void report() { + DatadogHttpRequest request = new DatadogHttpRequest(); + + for (DGauge g : gauges.values()) { + try { + // Will throw exception if the Gauge is not of Number type + // Flink uses Gauge to store many types other than Number + g.getMetricValue(); + request.addGauge(g); + } catch (Exception e) { + // ignore if the Gauge is not of Number type --- End diff -- You can safely remove the metric in this case. > Add Datadog HTTP metrics reporter > --------------------------------- > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics > Affects Versions: 1.3.0 > Reporter: Bowen Li > Assignee: Bowen Li > Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)