[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shuyi Chen updated FLINK-11912: ------------------------------- Summary: Expose per partition Kafka lag metric in Flink Kafka connector (was: Expose per partition Kafka lag metric in Flink Kafka consumer) > Expose per partition Kafka lag metric in Flink Kafka connector > -------------------------------------------------------------- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka > Affects Versions: 1.6.4, 1.7.2 > Reporter: Shuyi Chen > Assignee: Shuyi Chen > Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer, Flink was not able to properly register it because the metrics > are only available after the consumer start polling data from partitions. I > would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)