I've had a similar use case where we want to browse and display the latest few messages in different topics in a webapp.
This kind of works by doing as you describe; submitting a FetchRequest with an offset of messages_desired * avg_bytes_per_message plus a bit more. You'll get the ByteBuffer and then you can strip away until you reach a message. How to find where a message starts is not something that I've understood completely yet (I've not studied the protocol very carefully), but splitting the buffer by the pattern \u0000\u0000.?.?.?.?.?.?.?.? seems to work pretty well in our case, at least when there is no batching or compression involved. If someone has hints on a better way to find a message header, I'd also appreciate this info. On Fri, Jul 19, 2013 at 2:17 PM, David Arthur <mum...@gmail.com> wrote: > There is not index-based access to messages in 0.7 like there is in 0.8. > You have to start from a known good offset and iterate through the messages. > > What's your use case? Running a job periodically that reads the latest N > message from the queue? Is it impractical to run from the last known offset > and only keep the last N? > > > On 7/19/13 3:45 AM, Shane Moriah wrote: > >> We're running Kafka 0.7 and I'm hitting some issues trying to access the >> newest n messages in a topic (or at least in a broker/partition combo) and >> wondering if my use case just isn't supported or if I'm missing something. >> What I'd like to be able to do is get the most recent offset from a >> broker/partition combo, subtract an amount of bytes roughly equivalent to >> messages_desired*bytes_per_**message and then issue a FetchRequest with >> that >> offset and amount of bytes. >> >> I gathered from this >> post<http://mail-archives.**apache.org/mod_mbox/kafka-** >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E<http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E> >> > >> that >> I need to use the Simple Consumer in order to do offset manipulation >> beyond >> the start from beginning and start from end options. And I saw from this >> post<http://mail-archives.**apache.org/mod_mbox/incubator-** >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-** >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E<http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E> >> > >> that >> the offsets returned by getOffsetsBefore are really only the major >> checkpoints when files are rolled over, every 500MB by default. I also >> found that if I take an offset returned from getOffsetsBefore and subtract >> a fixed value, say 100KB, and submit that offset with a FetchRequest I get >> a kafka.common.**InvalidMessageSizeException, presumably since my >> computed >> offset didn't align with a real message offset. >> >> As far as I can tell, this leaves me only able to find the most recent >> milestone offset, perhaps up to 500MB behind current data, and extract a >> batch from that point forward. Is there any other way that I'm missing >> here? The two things that seem to be lacking are access to the most recent >> offset and the ability to rollback from that offset by a fixed amount of >> bytes or messages without triggering the InvalidMessageSizeException. >> >> Thanks, >> Shane >> >> >