Hi Gordon, Thanks for the suggestions, I think in general it would be good to make this periodic (with a configurable interval), and also show the latest committed (checkpointed) offset lag. I think it's better to show both not only one of them as they both carry useful information.
So we would have current lag per partition (for instance every 1 sec) and lag at the latest checkpoint per partition in an easily queryable way. Gyula Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2017. márc. 17., P, 14:24): > One other possibility for reporting “consumer lag” is to update the metric > only at a > configurable interval, if use cases can tolerate a certain delay in > realizing the consumer > has caught up. > > Or we could also piggy pack the consumer lag update onto the checkpoint > interval - > I think in the case that Gyula described, users might additionally want > to stop the old job only when the new job has “caught up with partition > head” && > “the offsets used to determine the lag is secured in a checkpoint”. That > should > address syncing the consumer lag calculation with the commit frequency > discussed here. > > What do you think? > > > On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) > wrote: > > Hi, > > I was thinking somewhat similar to what Ufuk suggested, > but if we want to report a “consumer lag” metric, we would > essentially need to request the latest offset on every record fetch > (because the latest offset advances as well), so I wasn’t so sure > of the performance tradeoffs there (the partition metadata request > and records requests require 2 separate calls, so we would > basically be doubling the requests calls to Kafka just for this). > > If we just want a metric that can show whether or not the > consumer has caught up with the “latest offset at the time the > consumer starts”, it would definitely be feasible. I wonder > how we want to name this metric though. > @Gyula @Florian @Bruno do you think this is enough for your needs? > > - Gordon > > On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (u...@apache.org) wrote: > > @Gordon: What's your take on integrating this directly into the > consumer? Can't we poll the latest offset wie the Offset API [1] and > report a consumer lag metric for the consumer group of the > application? This we could also display in the web frontend. > > In the first version, users would have to poll this metric manually. > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest > > On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <brunoara...@gmail.com> > wrote: > > Hi, > > > > We are interested on this too. So far we flag the records with > timestamps in > > different points of the pipeline and use metrics gauges to measure > latency > > between the different components, but would be good to know if there is > > something more specific to Kafka that we can do out of the box in Flink. > > > > Cheers, > > > > Bruno > > > > On Fri, 17 Mar 2017 at 10:07 Florian König <florian.koe...@micardo.com> > > wrote: > >> > >> Hi, > >> > >> thank you Gyula for posting that question. I’d also be interested in how > >> this could be done. > >> > >> You mentioned the dependency on the commit frequency. I’m using > >> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka > consumer > >> a job's offsets as shown in the diagrams updated a lot more regularly > than > >> the checkpointing interval. With the 10 consumer a commit is only made > after > >> a successful checkpoint (or so it seems). > >> > >> Why is that so? The checkpoint contains the Kafka offset and would be > able > >> to start reading wherever it left off, regardless of any offset stored > in > >> Kafka or Zookeeper. Why is the offset not committed regularly, > independently > >> from the checkpointing? Or did I misconfigure anything? > >> > >> Thanks > >> Florian > >> > >> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gyf...@apache.org>: > >> > > >> > Hi All, > >> > > >> > I am wondering if anyone has some nice suggestions on what would be > the > >> > simplest/best way of telling if a job is caught up with the Kafka > input. > >> > An alternative question would be how to tell if a job is caught up to > >> > another job reading from the same topic. > >> > > >> > The first thing that comes to my mind is looking at the offsets Flink > >> > commits to Kafka. However this will only work if every job uses a > different > >> > group id and even then it is not very reliable depending on the commit > >> > frequency. > >> > > >> > The use case I am trying to solve is fault tolerant update of a job, > by > >> > taking a savepoint for job1 starting job2 from the savepoint, waiting > until > >> > it catches up and then killing job1. > >> > > >> > Thanks for your input! > >> > Gyula > >> > >> > > > >