[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai updated FLINK-6109: --------------------------------------- Description: This is a feature discussed in this ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. As discussed, we can expose two kinds of "consumer lag" metrics for this: - *current consumer lag for partition:* the current difference between the latest offset and the last collected record record of a partition. This metric is calculated and updated at a configurable interval. This metric basically serves as an indicator of how the consumer is keeping up with the head of partitions. I propose to name this {{currentOffsetLag}}. - *Consumer lag of last checkpoint:* the difference between the latest offset and the offset stored in the checkpoint of a partition. This metric is only updated when checkpoints are completed. It serves as an indicator of how much data may need to be replayed in case of a failure. I propose to name this {{lastCheckpointedOffsetLag}}. I don't think it is reasonable to define a metric of whether or not a consumer has "caught up" with the HEAD. That would imply a threshold for the offset difference. We should probably leave this "caught up" logic for the user to determine themselves when they query this metric. The granularity of the metric is per-FlinkKafkaConsumer, and independent of the consumer group.id used (the offset used to calculate consumer lag is the internal offset state of the FlinkKafkaConsumer, not the consumer group's committed offsets in Kafka). was: This is a feature discussed in this ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. As discussed, we can expose two kinds of "consumer lag" metrics for this: - **current consumer lag for partition:** the current difference between the latest offset and the last collected record record of a partition. This metric is calculated and updated at a configurable interval. This metric basically serves as an indicator of how the consumer is keeping up with the head of partitions. I propose to name this {{currentOffsetLag}}. - **Consumer lag of last checkpoint:** the difference between the latest offset and the offset stored in the checkpoint of a partition. This metric is only updated when checkpoints are completed. It serves as an indicator of how much data may need to be replayed in case of a failure. I propose to name this {{lastCheckpointedOffsetLag}}. I don't think it is reasonable to define a metric of whether or not a consumer has "caught up" with the HEAD. That would imply a threshold for the offset difference. We should probably leave this "caught up" logic for the user to determine themselves when they query this metric. The granularity of the metric is per-FlinkKafkaConsumer, and independent of the consumer group.id used (the offset used to calculate consumer lag is the internal offset state of the FlinkKafkaConsumer, not the consumer group's committed offsets in Kafka). > Add "consumer lag" report metric to FlinkKafkaConsumer > ------------------------------------------------------ > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag for partition:* the current difference between the > latest offset and the last collected record record of a partition. This > metric is calculated and updated at a configurable interval. This metric > basically serves as an indicator of how the consumer is keeping up with the > head of partitions. I propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint:* the difference between the latest > offset and the offset stored in the checkpoint of a partition. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v6.3.15#6346)