[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792155#comment-16792155 ]
Shuyi Chen edited comment on FLINK-11912 at 3/14/19 12:22 AM: -------------------------------------------------------------- Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/c37394acc01ea5a0c4e2681319ecbfaa63beead3], could you please take a look and let me know if this is the right approach? Thanks a lot. was (Author: suez1224): Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know if this is the right approach? Thanks a lot. > Expose per partition Kafka lag metric in Flink Kafka connector > -------------------------------------------------------------- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka > Affects Versions: 1.6.4, 1.7.2 > Reporter: Shuyi Chen > Assignee: Shuyi Chen > Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)