Hi guys, I'm retrying to send some app related custom metrics from Flink to Datadog via StatsD.
I followed https://ci.apache.org/projects/flink/flink-docs- release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code like this // flink-conf.yaml metrics.reporters: stsd metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter metrics.reporter.stsd.host: localhost metrics.reporter.stsd.port: 8125 metrics.scope.jm: bowen.jobmanager metrics.scope.jm.job: bowen.jobmanager.bowen.job_name metrics.scope.tm: bowen.taskmanager.<tm_id> metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name> metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.< subtask_index> metrics.scope.operator: bowen.taskmanager.<tm_id>.< job_name>.<operator_name>.<subtask_index> // test code, by modifying WordCount example public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; private Counter counter; @Override public void open(Configuration config) { getRuntimeContext() .getMetricGroup() .addGroup("bowen.test") .gauge("bowen.test.flink", new Gauge<Integer>() { @Override public Integer getValue() { return 100; } }); // test custom metrics counter = getRuntimeContext() .getMetricGroup() .addGroup("bowen.test") .counter("bowen.flink.metric"); // test custom metrics counter.inc(100); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } counter.inc(100); } } I found my Datadog received all system scope metrics, but non of my custom metric. I researched all night but gained no progress. What did I do wrong? Flink is able to handle custom metrics right? I'd really appreciate some guidance on sending custom metrics! Thank you very much! Bowen