Here is my current (very hacky) piece of code handling this part:

  def getLastMessages(fetchSize: Int = 10000): List[String] = {
    val sConsumer = new SimpleConsumer(clusterip, 9092, 1000, 1024000)
    val currentOffset = sConsumer.getOffsetsBefore(topic, 0, -1, 3)

    val fetchRequest = new FetchRequest(topic, 0, (currentOffset(0) -
fetchSize).max(currentOffset(currentOffset.length - 1)), fetchSize)
    val msgBuffer = sConsumer.fetch(fetchRequest)

    sConsumer.close()

    def decodeBuffer(buffer: ByteBuffer, encoding: String, arrSize: Int =
msgBuffer.sizeInBytes.toInt - 6): String = {
      val size: Int = Option(try { buffer.getInt } catch { case e:
Throwable => -1 }).getOrElse(-1)
      if (size < 0) return s"No recent messages in topic $topic"
      val bytes = new Array[Byte](arrSize)
      buffer.get(bytes)
      new String(bytes, encoding)
    }
    val decStr = decodeBuffer(msgBuffer.getBuffer, "UTF-8")

    val header = "\u0000\u0000.?.?.?.?.?.?.?.?"
    val strLst = decStr.split(header).toList

    if (strLst.size > 1) strLst.tail else strLst
  }


On Fri, Jul 19, 2013 at 10:02 PM, Shane Moriah <shanemor...@gmail.com>wrote:

> I have a similar use-case to Johan.  We do stream processing off the topics
> in the backend but I'd like to expose a recent sample of a topic's data to
> a front-end web-app (just in a synchronous, click-a-button-and-see-results
> fashion).  If I can only start from the last file offset 500MB behind
> current and not (current - n bytes)  then the data might be very stale
> depending on how fast that topic is being filled. I could iterate from the
> last offset and keep only the final n, but that might mean processing 500MB
> each time just to grab 10 messages.
>
> Johan, are you using just the simple FetchRequest?  Did you get around the
> InvalidMessageSizeError when you try to force a fetch offset different from
> those returned by getOffsetsBefore?  Or are you also starting from that
> last known offset and iterating forwards by the desired amount?
>
>
> On Fri, Jul 19, 2013 at 11:33 AM, Johan Lundahl <johan.lund...@gmail.com
> >wrote:
>
> > I've had a similar use case where we want to browse and display the
> latest
> > few messages in different topics in a webapp.
> >
> > This kind of works by doing as you describe; submitting a FetchRequest
> with
> > an offset of messages_desired * avg_bytes_per_message plus a bit more.
> > You'll get the ByteBuffer and then you can strip away until you reach a
> > message. How to find where a message starts is not something that I've
> > understood completely yet (I've not studied the protocol very carefully),
> > but splitting the buffer by the pattern \u0000\u0000.?.?.?.?.?.?.?.?
> seems
> > to work pretty well in our case, at least when there is no batching or
> > compression involved.
> >
> > If someone has hints on a better way to find a message header, I'd also
> > appreciate this info.
> >
> >
> > On Fri, Jul 19, 2013 at 2:17 PM, David Arthur <mum...@gmail.com> wrote:
> >
> > > There is not index-based access to messages in 0.7 like there is in
> 0.8.
> > > You have to start from a known good offset and iterate through the
> > messages.
> > >
> > > What's your use case? Running a job periodically that reads the latest
> N
> > > message from the queue? Is it impractical to run from the last known
> > offset
> > > and only keep the last N?
> > >
> > >
> > > On 7/19/13 3:45 AM, Shane Moriah wrote:
> > >
> > >> We're running Kafka 0.7 and I'm hitting some issues trying to access
> the
> > >> newest n messages in a topic (or at least in a broker/partition combo)
> > and
> > >> wondering if my use case just isn't supported or if I'm missing
> > something.
> > >>   What I'd like to be able to do is get the most recent offset from a
> > >> broker/partition combo, subtract an amount of bytes roughly equivalent
> > to
> > >> messages_desired*bytes_per_**message and then issue a FetchRequest
> with
> > >> that
> > >> offset and amount of bytes.
> > >>
> > >> I gathered from this
> > >> post<http://mail-archives.**apache.org/mod_mbox/kafka-**
> > >> users/201212.mbox/%3CCCF8F23D.**5e4a%25zhaoyong...@gmail.com%**3E<
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3cccf8f23d.5e4a%25zhaoyong...@gmail.com%3E
> > >
> > >> >
> > >> that
> > >> I need to use the Simple Consumer in order to do offset manipulation
> > >> beyond
> > >> the start from beginning and start from end options.  And I saw from
> > this
> > >> post<http://mail-archives.**apache.org/mod_mbox/incubator-**
> > >> kafka-users/201209.mbox/%**3CCALD69j0iDCZZFF3nm-**
> > >> wrfvW5Y6wwxRZFOL8A1QQFugQUKdo6**x...@mail.gmail.com%3E<
> >
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%3ccald69j0idczzff3nm-wrfvw5y6wwxrzfol8a1qqfugqukdo...@mail.gmail.com%3E
> > >
> > >> >
> > >> that
> > >> the offsets returned by getOffsetsBefore are really only the major
> > >> checkpoints when files are rolled over, every 500MB by default.  I
> also
> > >> found that if I take an offset returned from getOffsetsBefore and
> > subtract
> > >> a fixed value, say 100KB, and submit that offset with a FetchRequest I
> > get
> > >> a kafka.common.**InvalidMessageSizeException, presumably since my
> > >> computed
> > >> offset didn't align with a real message offset.
> > >>
> > >> As far as I can tell, this leaves me only able to find the most recent
> > >> milestone offset, perhaps up to 500MB behind current data, and
> extract a
> > >> batch from that point forward. Is there any other way that I'm missing
> > >> here? The two things that seem to be lacking are access to the most
> > recent
> > >> offset and the ability to rollback from that offset by a fixed amount
> of
> > >> bytes or messages without triggering the InvalidMessageSizeException.
> > >>
> > >> Thanks,
> > >> Shane
> > >>
> > >>
> > >
> >
>

Reply via email to