@Gwen- But that only works for topics that have low enough traffic that you would ever actually hit that timeout.
The Confluent schema registry needs to do something similar to make sure it has fully consumed the topic it stores data in so it doesn't serve stale data. We know in our case we'll only have a single producer to the topic (the current leader of the schema registry cluster) so we have a different solution. We produce a message to the topic (which is 1 partition, but this works for a topic partition too), grab the resulting offset from the response, then consume until we see the message we produced. Obviously this isn't ideal since we a) have to produce extra bogus messages to the topic and b) it only works in the case where you know the consumer is also the only producer. The new consumer interface sort of addresses this since it has seek functionality, where one of the options is seekToEnd. However, I think you have to be very careful with this, especially using the current implementation. It seeks to the end, but it also marks those messages as consumed. This means that even if you keep track of your original position and seek back to it, if you use background offset commits you could end up committing incorrect offsets, crashing, and then missing some messages when another consumer claims that partition (or just due to another consumer joining the group). Not sure if there are many other use cases for grabbing the offset data with a simple API. Might mean there's a use case for either some additional API or some utilities independent of an actual consumer instance which allow you to easily query the state of topics/partitions. On Sun, May 10, 2015 at 12:43 AM, Gwen Shapira <gshap...@cloudera.com> wrote: > For Flume, we use the timeout configuration and catch the exception, with > the assumption that "no messages for few seconds" == "the end". > > On Sat, May 9, 2015 at 2:04 AM, James Cheng <jch...@tivo.com> wrote: > > > Hi, > > > > I want to use the high level consumer to read all partitions for a topic, > > and know when I have reached "the end". I know "the end" might be a > little > > vague, since items keep showing up, but I'm trying to get as close as > > possible. I know that more messages might show up later, but I want to > know > > when I've received all the items that are currently available in the > topic. > > > > Is there a standard/recommended way to do this? > > > > I know one way to do it is to first issue an OffsetRequest for each > > partition, which would get me the last offset, and then use that > > information in my high level consumer to detect when I've reached that a > > message with that offset. Which is exactly what the SimpleConsumer > example > > does ( > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > ). > > That involves finding the leader for the partition, etc etc. Not hard, > but > > a bunch of steps. > > > > I noticed that kafkacat has an option similar to what I'm looking for: > > -e Exit successfully when last message received > > > > Looking at the code, it appears that a FetchRequest returns the > > HighwaterMarkOffset mark for a partition, and the API docs confirm that: > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse > > > > Does the Java high-level consumer expose the HighwaterMarkOffset in any > > way? I looked but I couldn't find such a thing. > > > > Thanks, > > -James > > > > > -- Thanks, Ewen