
Tzu-Li (Gordon) Tai commented on FLINK-6109:

Thanks for picking the issue up [~aitozi]! Of course, contributions are always 

I have a few points I would like to discuss:

1. I've thought a bit about exposing "lag" as a metric directly in the Kafka 
consumer, but it seems like it could be a bit out-of-scope for the consumer 
itself. For the consumer to calculate this lag, it would essentially need to 
query Kafka the current head offset, which can be very overwhelming.

2. Because of that, I suggest that we only add a `checkpointedOffset` metric, 
scoped by topic and partition id. We already have a `currentOffsets` and 
`committedOffsets` metric, so adding that would be a good addition overall, 
IMO. The "lag" should be determined by the user in conjuction with other Kafka 

3. You can perhaps build your work on top of 
https://github.com/apache/flink/pull/5214, which I'll be merging soon, to avoid 
any conflicts.

I would like this new metric to be added for the 1.5.0 release, if possible. 
How are you doing currently with the feature?

> Add "consumer lag" report metric to FlinkKafkaConsumer
> ------------------------------------------------------
>                 Key: FLINK-6109
>                 URL: https://issues.apache.org/jira/browse/FLINK-6109
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: aitozi
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).

This message was sent by Atlassian JIRA

Reply via email to