GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/4935
[Flink-7945][Metrics&connector]Fix per partition-lag metric lost in kafka connector ## What is the purpose of the change *When used KafkaConnector, we cant get per partition lag metric. But it has been exposed after kafka 0.10.2 [https://issues.apache.org/jira/browse/KAFKA-4381](url). After read the kafka code, i found that the per partition lag is register after `KafkaConsumer#poll` method be invoked, so i change the metric register time in flink , and after this, with kafka-connector10 and kafka-connector11 we can see the correct lag metric. * ## Brief change log - *Change the kafka metric register time in Flink kafka-connector* ## Verifying this change This change is already run through the test case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-7945 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4935 ---- commit 4f0e405fd0e697e67a0d4dc301d85244fc031086 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T12:51:46Z change the way to get metric in kafkaConsumerThread commit 183eea766ab6302c4f0813b2372f95a299ead67d Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T14:44:19Z overrride the createCallBridge method in kafkaFetcher10 commit d109efe7e2290eafdedf21fa7fbb4b8ac2d1bb58 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:11:41Z remove unused import commit 7dd26b6ddfe0f16ac57d9810dc46ae6b9fb34d18 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:13:34Z checkstyle commit 61db98e0469d85755d6cea560e110f61b6135739 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:29:47Z add debug log commit b55ab47b819dec90b18b8d57df5978aae0496e11 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:41:00Z remove log commit 64ae04f0846b6fcdc851e98a1df71e486bdf7762 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-10-31T15:43:44Z checkstyle commit bc16ae2ff89e63f71a050483bffb6d8a4389acd0 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T08:37:03Z change the location of register kafka metrics to flink commit 6fdf8e082669bd69fb730c32c5755660c59d2ab3 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T08:50:12Z checkstyle commit df2620926077c307510baaf74f0d10bf34fe6a1c Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T09:19:57Z use specific version poll method commit c7f44b99911665c974706c6025f69aa097657494 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T09:32:00Z method signature commit b41be18914c0ad8800f6faa30f1fcb0b995e40c0 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T13:52:39Z remove callbridge invoke commit c0dea5068cbb04763265b8f7dc6d80fc4b7cff49 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T14:29:31Z just for test commit e3df3a0705329d4e19f03a18b412e03664a62c9c Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T15:06:44Z judge poll success commit 3dbfa26ee6b46e6a1a6d708dd5bb759ff86014c8 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T15:47:07Z judge ConsumerRecords not empty commit 7f1f653e6346f0e09cf0582d312ae10d223ba92a Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T15:54:09Z checkstyle commit 7828945af3e560e782ee12f0cd11018d3f4e8dbf Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T16:11:14Z add flag to judge whether kafka has been registered commit 3dbd601ae20d1c5163a01e20b991b175f1180aff Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T16:15:24Z doc format commit f9b8fd4e2c9fc488456b141158d239ce2386a854 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T22:35:16Z add metrics exist judge and remove unsed code commit c14feacbe7db945f313de4a39dde13ecc1825924 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T22:39:15Z checkstyle commit 22131e05f682d73dc92e0f4f7501550cbe5cdccc Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T22:48:19Z init registeredMetrics commit 71a139875fbbd8bbcf3ebfb996aed59c298ac951 Author: minwenjun <minwen...@didichuxing.com> Date: 2017-11-01T22:50:51Z remove unsed import ---- ---