Here is my current (very hacky) piece of code handling this part: def getLastMessages(fetchSize: Int = 10000): List[String] = { val sConsumer = new SimpleConsumer(clusterip, 9092, 1000, 1024000) val currentOffset = sConsumer.getOffsetsBefore(topic, 0, -1, 3)
val fetchRequest = new FetchRequest(topic, 0, (currentOffset(0) - fetchSize).max(currentOffset(currentOffset.length - 1)), fetchSize) val msgBuffer = sConsumer.fetch(fetchRequest) sConsumer.close() def decodeBuffer(buffer: ByteBuffer, encoding: String, arrSize: Int = msgBuffer.sizeInBytes.toInt - 6): String = { val size: Int = Option(try { buffer.getInt } catch { case e: Throwable => -1 }).getOrElse(-1) if (size < 0) return s"No recent messages in topic $topic" val bytes = new Array[Byte](arrSize) buffer.get(bytes) new String(bytes, encoding) } val decStr = decodeBuffer(msgBuffer.getBuffer, "UTF-8") val header = "\u0000\u0000.?.?.?.?.?.?.?.?" val strLst = decStr.split(header).toList if (strLst.size > 1) strLst.tail else strLst } On Fri, Jul 19, 2013 at 10:02 PM, Shane Moriah <shanemor...@gmail.com>wrote: > I have a similar use-case to Johan. We do stream processing off the topics > in the backend but I'd like to expose a recent sample of a topic's data to > a front-end web-app (just in a synchronous, click-a-button-and-see-results > fashion). If I can only start from the last file offset 500MB behind > current and not (current - n bytes) then the data might be very stale > depending on how fast that topic is being filled. I could iterate from the > last offset and keep only the final n, but that might mean processing 500MB > each time just to grab 10 messages. > > Johan, are you using just the simple FetchRequest? Did you get around the > InvalidMessageSizeError when you try to force a fetch offset different from > those returned by getOffsetsBefore? Or are you also starting from that > last known offset and iterating forwards by the desired amount? > > > On Fri, Jul 19, 2013 at 11:33 AM, Johan Lundahl <johan.lund...@gmail.com > >wrote: > > > I've had a similar use case where we want to browse and display the > latest > > few messages in different topics in a webapp. > > > > This kind of works by doing as you describe; submitting a FetchRequest > with > > an offset of messages_desired * avg_bytes_per_message plus a bit more. > > You'll get the ByteBuffer and then you can strip away until you reach a > > message. How to find where a message starts is not something that I've > > understood completely yet (I've not studied the protocol very carefully), > > but splitting the buffer by the pattern \u0000\u0000.?.?.?.?.?.?.?.? > seems > > to work pretty well in our case, at least when there is no batching or > > compression involved. > > > > If someone has hints on a better way to find a message header, I'd also > > appreciate this info. > > > > > > On Fri, Jul 19, 2013 at 2:17 PM, David Arthur <mum...@gmail.com> wrote: > > > > > There is not index-based access to messages in 0.7 like there is in > 0.8. > > > You have to start from a known good offset and iterate through the > > messages. > > > > > > What's your use case? Running a job periodically that reads the latest > N > > > message from the queue? Is it impractical to run from the last known > > offset > > > and only keep the last N? > > > > > > > > > On 7/19/13 3:45 AM, Shane Moriah wrote: > > > > > >> We're running Kafka 0.7 and I'm hitting some issues trying to access > the > > >> newest n messages in a topic (or at least in a broker/partition combo) > > and > > >> wondering if my use case just isn't supported or if I'm missing > > something. > > >> What I'd like to be able to do is get the most recent offset from a > > >> broker/partition combo, subtract an amount of bytes roughly equivalent > > to > > >> messages_desired*bytes_per_**message and then issue a FetchRequest > with > > >> that > > >> offset and amount of bytes. > > >> > > >> I gathered from this > > >> post<http://mail-archives.**apache.org/mod_mbox/kafka-** > > >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E< > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E > > > > > >> > > > >> that > > >> I need to use the Simple Consumer in order to do offset manipulation > > >> beyond > > >> the start from beginning and start from end options. And I saw from > > this > > >> post<http://mail-archives.**apache.org/mod_mbox/incubator-** > > >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-** > > >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E< > > > http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E > > > > > >> > > > >> that > > >> the offsets returned by getOffsetsBefore are really only the major > > >> checkpoints when files are rolled over, every 500MB by default. I > also > > >> found that if I take an offset returned from getOffsetsBefore and > > subtract > > >> a fixed value, say 100KB, and submit that offset with a FetchRequest I > > get > > >> a kafka.common.**InvalidMessageSizeException, presumably since my > > >> computed > > >> offset didn't align with a real message offset. > > >> > > >> As far as I can tell, this leaves me only able to find the most recent > > >> milestone offset, perhaps up to 500MB behind current data, and > extract a > > >> batch from that point forward. Is there any other way that I'm missing > > >> here? The two things that seem to be lacking are access to the most > > recent > > >> offset and the ability to rollback from that offset by a fixed amount > of > > >> bytes or messages without triggering the InvalidMessageSizeException. > > >> > > >> Thanks, > > >> Shane > > >> > > >> > > > > > >