[ https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357502#comment-15357502 ]
ASF GitHub Bot commented on FLINK-4116: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2158#discussion_r69173750 --- Diff: docs/apis/common/index.md --- @@ -1350,3 +1350,397 @@ You may specify program arguments before the job is executed. The plan visualiza the execution plan before executing the Flink job. {% top %} + +Metrics +------------------- + +Flink exposes a metric system that allows gathering and exposing metrics to external systems. + +### Registering metrics + +You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`. +This method returns a `MetricGroup` object on which you can create and register new metrics. + +### Metric types + +Flink supports `Counters`, `Gauges` and `Histograms`. + +#### Counter + +A `Counter` is used to count something. The current value can be in- or decremented using `inc()/inc(long n)` or `dec()/dec(long n)`. +You can create and register a `Counter` by calling `counter(String name)` on a MetricGroup. + +{% highlight java %} + +public class MyMapper extends RichMapFunction<String, Integer> { + private Counter counter; + + @Override + public void open(Configuration config) { + // create and register a counter + this.counter = getRuntimeContext().getMetricGroup().counter("myCounter"); + ... + } + + @public Integer map(String value) throws Exception { + // increment counter + this.counter.inc(); + ... + } +} + +{% endhighlight %} + +Alternatively you can also use your own `Counter` implementation: + +{% highlight java %} + +public class MyMapper extends RichMapFunction<String, Integer> { + ... + + @Override + public void open(Configuration config) { + // register a custom counter + this.counter = getRuntimeContext().getmetricGroup().counter("myCustomCounter", new CustomCounter()); + ... + } + ... +} + +{% endhighlight %} + +#### Gauge + +A `Gauge` provides a value of any type on demand. In order to use a `Gauge` you must first create a class that implements the `org.apache.flink.metrics.Gauge` interface. +There is not restriction for the type of the returned value. +You can register a gauge by calling `gauge(String name, Gauge gauge)` on a MetricGroup. + +{% highlight java %} + +public class MyMapper extends RichMapFunction<String, Integer> { + private int valueToExpose; + + @Override + public void open(Configuration config) { + // register the gauge + getRuntimeContext().getmetricGroup().gauge("MyGauge", new Gauge<Integer>() { + @Override + public Integer getValue() { + return valueToExpose; + }}); + ... + } + ... +} + +{% endhighlight %} + +#### Histogram + +A Histogram measure the distribution of long values. +You can register one by calling histogram(String name, Histogram histogram) on a MetricGroup. + +{% highlight java %} +public class MyMapper extends RichMapFunction<Long, Integer> { + private Histogram histogram; + + @Override + public void open(Configuration config) { + // create and register a counter + this.histogram = getRuntimeContext().getMetricGroup().histogram("myHistogram", new MyHistogram()); + ... + } + + @public Integer map(Long value) throws Exception { + this.histogram.update(value); + ... + } +} +{% endhighlight %} + +Flink only provides an interface for Histograms, but offers a Wrapper that allows usage of Codahale/DropWizard Histograms. (org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper) +This wrapper is contained in the `flink-metrics-dropwizard` module. + +### Scope + +Every registered metric has an automatically assigned scope which represents the entities it is tied to. By default a metric that is registered in a user function will be scoped to the operator in which the function runs, the task/job it belongs to and the taskManager/host it is executed on. This is referred to as the "system scope". + +You can define an additonal "user scope" by calling the either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`. + +{% highlight java %} + +counter = getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter"); + +{% endhighlight %} + +The name under which a metric is exported is based on both scopes and the name passed in the `counter()` call. The order is always \<system_scope>\<user_scope>\<name>. + +The system scope allows the reported name to contain contextual information like the name of job it was registered in without requiring the user to pass this information manually. + +How the system scope affects the reported name for a metric can be modified by setting the following keys in the flink-conf.yaml. +Each of these keys expect a format string that may contain constants (e.g. "taskmanager") and variables (e.g. "\<task_id>") which will be replaced at runtime. + +- `metrics.scope.jm` + - Default: \<host>.jobmanager + - Applied to all metrics that were scoped to a jobmanager. +- `metrics.scope.jm.job` + - Default: \<host>.jobmanager.\<job_name> + - Applied to all metrics that were scoped to a jobmanager and job. +- `metrics.scope.tm` + - Default: \<host>.taskmanager.\<tm_id> + - Applied to all metrics that were scoped to a taskmanager. +- `metrics.scope.tm.job` + - Default: \<host>.taskmanager.\<tm_id>.\<job_name> + - Applied to all metrics that were scoped to a taskmanager and job. +- `metrics.scope.tm.task` + - Default: \<host>.taskmanager.\<tm_id>.\<job_name>.\<task_name>.\<subtask_index> + - Applied to all metrics that were scoped to a task. +- `metrics.scope.tm.operator` + - Default: \<host>.taskmanager.\<tm_id>.\<job_name>.\<operator_name>.\<subtask_index> + - Applied to all metrics that were scoped to an operator. + +Note that for metrics for which multiple formats may apply (like jm and jm.job) the most specific format takes precedence, +in this case jm.job. + +The hierarchical orders are as follows: + +jm < jm.job + +tm < tm.job < tm.task < tm.operator + +This hierarchy also defines which variables may be accessed. The `tm.operator` format may contain variables for jobs, whereas `tm.job` may not contain +variables for operators. There is no restriction on order of variables. + +There is no restriction on the order of variables. + +The default scope for operator metrics will result in a metric name akin to `localhost.taskmanager.1234.MyJob.MyOperator.0` + +If you also want to include the task name, but omit the taskmanager information you can specify the following format: + +`metrics.scope.tm.operator: \<host>.\<job_name>.\<task_name>.\<operator_name>.\<subtask_index>` + +This could create the name `localhost.MyJob.MySource_->_MyOperator.MyOperator.0`. + +The following is a list of all variables available: + +- JobManager: \<host> +- TaskManager: \<host>, \<tm_id> +- Job: \<job_id>, \<job_name> +- Task: \<task_id>, \<task_name>, \<task_attempt_id>, \<task_attempt_num>, \<subtask_index> +- Operator: \<operator_name>, \<subtask_index> + +### Reporter + +Metrics can be exposed to an external system by configuring a reporter in the `conf/flink-conf.yaml`. + +- `metrics.reporter.class`: The class of the reporter to use. + - Example: org.apache.flink.metrics.reporter.JMXReporter +- `metrics.reporter.arguments`: A list of named parameters that are passed to the reporter. + - Example: --host localhost --port 9010 +- `metrics.reporter.interval`: The interval between reports. + - Example: 10 SECONDS + +You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface. +If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well. + +By default Flink uses JMX to expose metrics. +All non-JMXReporters are not part of the distribution and have to be added to the classpath manually, either by putting the jar into /lib +or including it in the job jar. + +Flink supports the following systems: + +#### JMX + +The port for JMX can be configured by setting the `metrics.jmx.port` key. This parameter expects either a single port +or a port range, with the default being 9010-9025. The used port is shown in the relevant Job-/TaskManager log. + +#### Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter) +Dependency: +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-ganglia</artifactId> + <version>1.1-SNAPSHOT</version> +</dependency> +{% endhighlight %} + +Parameters: --- End diff -- yes > Document metrics > ---------------- > > Key: FLINK-4116 > URL: https://issues.apache.org/jira/browse/FLINK-4116 > Project: Flink > Issue Type: Improvement > Components: Documentation, Metrics > Affects Versions: 1.1.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > The metric system is currently not documented, which should be fixed before > the 1.1 release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)