I have used what Gwen has suggested but to avoid false positive: While consuming records keep track of *last* consumed offset and compare with latest offset on broker for consumed topic when you get "TimeOut Exception" for that particular partition for given topic (e.g JMX Bean *LogEndOffset *for consumed topic for given partition.
This works well. In our use case, we were using High Level Consumer for only *single* topic. I hope this helps ! Thanks, Bhavesh On Sun, May 10, 2015 at 2:03 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > @Gwen- But that only works for topics that have low enough traffic that you > would ever actually hit that timeout. > > The Confluent schema registry needs to do something similar to make sure it > has fully consumed the topic it stores data in so it doesn't serve stale > data. We know in our case we'll only have a single producer to the topic > (the current leader of the schema registry cluster) so we have a different > solution. We produce a message to the topic (which is 1 partition, but this > works for a topic partition too), grab the resulting offset from the > response, then consume until we see the message we produced. Obviously this > isn't ideal since we a) have to produce extra bogus messages to the topic > and b) it only works in the case where you know the consumer is also the > only producer. > > The new consumer interface sort of addresses this since it has seek > functionality, where one of the options is seekToEnd. However, I think you > have to be very careful with this, especially using the current > implementation. It seeks to the end, but it also marks those messages as > consumed. This means that even if you keep track of your original position > and seek back to it, if you use background offset commits you could end up > committing incorrect offsets, crashing, and then missing some messages when > another consumer claims that partition (or just due to another consumer > joining the group). > > Not sure if there are many other use cases for grabbing the offset data > with a simple API. Might mean there's a use case for either some additional > API or some utilities independent of an actual consumer instance which > allow you to easily query the state of topics/partitions. > > > On Sun, May 10, 2015 at 12:43 AM, Gwen Shapira <gshap...@cloudera.com> > wrote: > > > For Flume, we use the timeout configuration and catch the exception, with > > the assumption that "no messages for few seconds" == "the end". > > > > On Sat, May 9, 2015 at 2:04 AM, James Cheng <jch...@tivo.com> wrote: > > > > > Hi, > > > > > > I want to use the high level consumer to read all partitions for a > topic, > > > and know when I have reached "the end". I know "the end" might be a > > little > > > vague, since items keep showing up, but I'm trying to get as close as > > > possible. I know that more messages might show up later, but I want to > > know > > > when I've received all the items that are currently available in the > > topic. > > > > > > Is there a standard/recommended way to do this? > > > > > > I know one way to do it is to first issue an OffsetRequest for each > > > partition, which would get me the last offset, and then use that > > > information in my high level consumer to detect when I've reached that > a > > > message with that offset. Which is exactly what the SimpleConsumer > > example > > > does ( > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > ). > > > That involves finding the leader for the partition, etc etc. Not hard, > > but > > > a bunch of steps. > > > > > > I noticed that kafkacat has an option similar to what I'm looking for: > > > -e Exit successfully when last message received > > > > > > Looking at the code, it appears that a FetchRequest returns the > > > HighwaterMarkOffset mark for a partition, and the API docs confirm > that: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse > > > > > > Does the Java high-level consumer expose the HighwaterMarkOffset in any > > > way? I looked but I couldn't find such a thing. > > > > > > Thanks, > > > -James > > > > > > > > > > > > -- > Thanks, > Ewen >