I have a similar use-case to Johan. We do stream processing off the topics in the backend but I'd like to expose a recent sample of a topic's data to a front-end web-app (just in a synchronous, click-a-button-and-see-results fashion). If I can only start from the last file offset 500MB behind current and not (current - n bytes) then the data might be very stale depending on how fast that topic is being filled. I could iterate from the last offset and keep only the final n, but that might mean processing 500MB each time just to grab 10 messages.
Johan, are you using just the simple FetchRequest? Did you get around the InvalidMessageSizeError when you try to force a fetch offset different from those returned by getOffsetsBefore? Or are you also starting from that last known offset and iterating forwards by the desired amount? On Fri, Jul 19, 2013 at 11:33 AM, Johan Lundahl <johan.lund...@gmail.com>wrote: > I've had a similar use case where we want to browse and display the latest > few messages in different topics in a webapp. > > This kind of works by doing as you describe; submitting a FetchRequest with > an offset of messages_desired * avg_bytes_per_message plus a bit more. > You'll get the ByteBuffer and then you can strip away until you reach a > message. How to find where a message starts is not something that I've > understood completely yet (I've not studied the protocol very carefully), > but splitting the buffer by the pattern \u0000\u0000.?.?.?.?.?.?.?.? seems > to work pretty well in our case, at least when there is no batching or > compression involved. > > If someone has hints on a better way to find a message header, I'd also > appreciate this info. > > > On Fri, Jul 19, 2013 at 2:17 PM, David Arthur <mum...@gmail.com> wrote: > > > There is not index-based access to messages in 0.7 like there is in 0.8. > > You have to start from a known good offset and iterate through the > messages. > > > > What's your use case? Running a job periodically that reads the latest N > > message from the queue? Is it impractical to run from the last known > offset > > and only keep the last N? > > > > > > On 7/19/13 3:45 AM, Shane Moriah wrote: > > > >> We're running Kafka 0.7 and I'm hitting some issues trying to access the > >> newest n messages in a topic (or at least in a broker/partition combo) > and > >> wondering if my use case just isn't supported or if I'm missing > something. > >> What I'd like to be able to do is get the most recent offset from a > >> broker/partition combo, subtract an amount of bytes roughly equivalent > to > >> messages_desired*bytes_per_**message and then issue a FetchRequest with > >> that > >> offset and amount of bytes. > >> > >> I gathered from this > >> post<http://mail-archives.**apache.org/mod_mbox/kafka-** > >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E< > http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E > > > >> > > >> that > >> I need to use the Simple Consumer in order to do offset manipulation > >> beyond > >> the start from beginning and start from end options. And I saw from > this > >> post<http://mail-archives.**apache.org/mod_mbox/incubator-** > >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-** > >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E< > http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E > > > >> > > >> that > >> the offsets returned by getOffsetsBefore are really only the major > >> checkpoints when files are rolled over, every 500MB by default. I also > >> found that if I take an offset returned from getOffsetsBefore and > subtract > >> a fixed value, say 100KB, and submit that offset with a FetchRequest I > get > >> a kafka.common.**InvalidMessageSizeException, presumably since my > >> computed > >> offset didn't align with a real message offset. > >> > >> As far as I can tell, this leaves me only able to find the most recent > >> milestone offset, perhaps up to 500MB behind current data, and extract a > >> batch from that point forward. Is there any other way that I'm missing > >> here? The two things that seem to be lacking are access to the most > recent > >> offset and the ability to rollback from that offset by a fixed amount of > >> bytes or messages without triggering the InvalidMessageSizeException. > >> > >> Thanks, > >> Shane > >> > >> > > >