Hi, I am using beam python SDK with flink runner, and I am trying to add custom labels to the metrics.
It seems like the provided function (link <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/metrics/metric.py#L70>) doesn't allow me to add labels. ``` @staticmethod def counter(namespace, name): ``` Taking a deeper look on the code, it is using `MetricName`: ``` return Metrics.DelegatingCounter(MetricName(namespace, name)) ``` and the MetricName does support for labels (link <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/metrics/metricbase.py#L52> ): ``` def __init__(self, namespace, name, urn=None, labels=None): ``` Therefore I am trying to manually update the metricName labels by creating a new class that does pass the labels: ``` class MetricWithLabels(object): @staticmethod def counter(namespace, name, labels): namespace = beam.metrics.Metrics.get_namespace(namespace) return beam.metrics.Metrics.DelegatingCounter(beam.metrics.metricbase.MetricName(namespace, name, labels=labels)) ``` However, when I test this in code ``` class addExample(beam.DoFn): def __init__(self): beam.DoFn.__init__(self) self.example_counter = MetricWithLabels.counter('example_counter', {'label1': 'value1'}) def process(self, element): self.example_counter.inc() return element ``` The final output still don't have the labels I created, and it seems like the output metrics is having labels completely overwrite by Flink runner.: ``` flink_taskmanager_job_task_operator_custom_example_counter{job_id="41b6a8a793e47f8edaa91e2dccc90a5f",task_id="ecddd3e2ade3edda3cd8430d7b243742",task_attempt_id="72497124f6cc6d7c73674fcbba0664b4",host="172.19.0.5",operator_id="04ff9a876e7d6fb91800bbfb5f72ce25",operator_name="[3]{Convert format, metrics example, logging}",task_name="Source: Reading message from kafka/Read from kafka topic ['test']/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> Map -> [1]Reading message from kafka/Read from kafka topic ['test']/Remove Kafka Metadata -> [3]{Convert format, metrics example, logging}",task_attempt_num="0",job_name="None",tm_id="172.19.0.5:37847 -bb580c",subtask_index="0",} ``` I wonder if there's a way that we can pass in the custom labels to the metrics with flink runner, and if not, where are the labels being overwritten so that I might be able to update the code to support custom labels. Thanks
