I found this embedded kafka example online (
https://gist.github.com/mardambey/2650743)  which I am re-writing to work
with 0.8

Can someone help me re-write this portion:


  val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
  var offset = 0L

  var i = 0

  while (true) {
    val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)

    for (msg <- cons.fetch(fetchRequest)) {
      i = i + 1
      println("consumed [ " + i + "]: offset = " + msg.offset + ",
payload = " + Utils.toString(msg.message.payload, "UTF-8"))
      offset = msg.offset
    }
  }


I have this so far:

  val partition = 0
  var offset = 0L

  var i = 0
  while (true) {
    //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
    val fetchRequest = new FetchRequestBuilder().addFetch(topic,
partition, offset, 1024).build()

    val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)

    val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
    println("consumed Message " + messageSet(0).message)

  }

This currently loops forever b/c it isn't incrementing the offset or anything.
I'm confused b/c I believe there is no more offset as things are more
user friendly with an incrmeenting counter.

Any help would be appreciated.

Reply via email to