Hi, You can get lag as a metric here: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#metrics() .
BR, Stas. 2017-10-13 2:08 GMT+02:00 Stephen Powis <spo...@salesforce.com>: > So I have the same use case as the original poster and had the same issue > with the older 0.10.x clients and not being able to determine the tail > offsets even tho the fetch response contains the HW mark. > > From what I could understand by tracing through the 0.11.0 consumer code, > it makes additional API/network calls to the kafka cluster to retrieve the > tail/end offsets information. Assuming I haven't mis-read/mis-understood > the code, for most use cases this probably makes sense. But in time > sensitive code, it bummed me out to have to make additional calls to get > that information when technically it's already available via the HW > property in the fetches, just the consumer has no access to it. > > Is there any talk about exposing this property somewhere in the consumers > in the future? > > On Fri, Oct 13, 2017 at 8:35 AM, Manan G <manan....@gmail.com> wrote: > > > NM. 0.11 KafkaConsumer seems to have added "endOffsets" API! > > > > On Thu, Oct 12, 2017 at 3:31 PM, Manan G <manan....@gmail.com> wrote: > > > > > For my use case, I need to figure out the lag within the Java consumer > > > itself that is consuming some topic. Ideally, the consumer application > > > would monitor the lag every minute or so and take some action on its > own > > if > > > consumer falls behind (i.e. spin up more threads to process records - > my > > > use case does not care about record order). For our purpose, it is OK > if > > > lag information is bit stale. > > > > > > * AFAIK, Java KafkaConsumer APIs do not seem to expose any information > > > directly based on which I can figure out the lag within my Java > consumer > > > application. > > > > > > * It seems that, alternatively, I can create separate new KafkaConsumer > > > (or possibly use existing KafkaConsumer my consumer application is > > using), > > > seek to end, and call "position()" API to figure out the end offsets > for > > > all partitions I am interested in. Since within my consumer > application, > > I > > > already know which offset consumer is at, I can figure out the lag. > > > However, for this one basic information, this solution is bit more > > involved > > > for our framework for various reasons. Also, it requires one to either > > use > > > separate KafkaConsumer just to figure out lag or possibly re-use same > > > KafkaConsumer our consumer application is using, but somehow implement > > the > > > logic of forwarding to end to find out end offset and resetting back > (not > > > sure if it's feasible yet without issues). > > > > > > * If I am not mistaken, looking at KafkaConsumer code itself, "high > > > watermark" information seems to be already there in FetchResponse > > > (FetchResponse.PartitionData). It's just that it is not exposed. Is > there > > > any way to retrieve this information somehow in my consumer > application? > > > > > > In general, wouldn't it be useful for consumer application to have lag > > > information available with simple API call? > > > > > > Thanks, > > > M > > > > > > > > > > > >