Sorry for joining this discussion late, but there is already a metric for the offset lag in our 0.9+ consumers. Its called the "records-lag-max": https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring and its exposed via Flink's metrics system. The only issue is that it only shows the maximum lag across all partitions, not detailed per-partition metrics.
On Mon, Mar 20, 2017 at 3:43 PM, Bruno Aranda <brunoara...@gmail.com> wrote: > Hi, > > Thanks! The proposal sounds very good to us too. > > Bruno > > On Sun, 19 Mar 2017 at 10:57 Florian König <florian.koe...@micardo.com> > wrote: > >> Thanks Gordon for the detailed explanation! That makes sense and explains >> the expected behaviour. >> >> The JIRA for the new metric also sounds very good. Can’t wait to have >> this in the Flink GUI (KafkaOffsetMonitor has some problems and stops >> working after 1-2 days, don’t know the reason yet). >> >> All the best, >> Florian >> >> >> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <tzuli...@apache.org >> >: >> > >> > @Florian >> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right >> now for the offset committing. >> > >> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” >> etc. settings will be completely ignored and overwritten before used to >> instantiate the interval Kafka clients, hence committing will only happen >> on Flink checkpoints. >> > >> > In 0.8, this isn’t the case. Both automatic periodic committing and >> committing on checkpoints can take place. That’s perhaps why you’re >> observing the 0.8 consumer to be committing more frequently. >> > >> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re >> interested, you can take a look at https://github.com/apache/ >> flink/pull/3527. >> > >> > - Gordon >> > >> > >> > On March 17, 2017 at 6:07:38 PM, Florian König ( >> florian.koe...@micardo.com) wrote: >> > >> >> Why is that so? The checkpoint contains the Kafka offset and would be >> able to start reading wherever it left off, regardless of any offset stored >> in Kafka or Zookeeper. Why is the offset not committed regularly, >> independently from the checkpointing? Or did I misconfigure anything? >> >> >>