Hi,

I am using beam python SDK with flink runner, and I am trying to add custom
labels to the metrics.

It seems like the provided function (link
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/metrics/metric.py#L70>)
doesn't allow me to add labels.

```
 @staticmethod
  def counter(namespace, name):
```

Taking a deeper look on the code, it is using `MetricName`:
```
return Metrics.DelegatingCounter(MetricName(namespace, name))
```

and the MetricName does support for labels (link
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/metrics/metricbase.py#L52>
):
```
 def __init__(self, namespace, name, urn=None, labels=None):
```


Therefore I am trying to manually update the metricName labels by creating
a new class that does pass the labels:
```
class MetricWithLabels(object):

    @staticmethod
    def counter(namespace, name, labels):
        namespace = beam.metrics.Metrics.get_namespace(namespace)
        return
beam.metrics.Metrics.DelegatingCounter(beam.metrics.metricbase.MetricName(namespace,
name, labels=labels))
```

However, when I test this in code
```
class addExample(beam.DoFn):

    def __init__(self):
        beam.DoFn.__init__(self)
        self.example_counter = MetricWithLabels.counter('example_counter',
{'label1': 'value1'})


    def process(self, element):
        self.example_counter.inc()
        return element
```

The final output still don't have the labels I created, and it seems like
the output metrics is having labels completely overwrite by Flink runner.:
```
flink_taskmanager_job_task_operator_custom_example_counter{job_id="41b6a8a793e47f8edaa91e2dccc90a5f",task_id="ecddd3e2ade3edda3cd8430d7b243742",task_attempt_id="72497124f6cc6d7c73674fcbba0664b4",host="172.19.0.5",operator_id="04ff9a876e7d6fb91800bbfb5f72ce25",operator_name="[3]{Convert
format, metrics example, logging}",task_name="Source: Reading message from
kafka/Read from kafka topic
['test']/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
-> Flat Map -> Map -> [1]Reading message from kafka/Read from kafka topic
['test']/Remove Kafka Metadata -> [3]{Convert format, metrics example,
logging}",task_attempt_num="0",job_name="None",tm_id="172.19.0.5:37847
-bb580c",subtask_index="0",}
```

I wonder if there's a way that we can pass in the custom labels to the
metrics with flink runner, and if not, where are the labels being
overwritten so that I might be able to update the code to support custom
labels. Thanks

Reply via email to