I found this embedded kafka example online ( https://gist.github.com/mardambey/2650743) which I am re-writing to work with 0.8
Can someone help me re-write this portion: val cons = new SimpleConsumer("localhost", 9090, 100, 1024) var offset = 0L var i = 0 while (true) { val fetchRequest = new FetchRequest("TEST", 0, offset, 1024) for (msg <- cons.fetch(fetchRequest)) { i = i + 1 println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8")) offset = msg.offset } } I have this so far: val partition = 0 var offset = 0L var i = 0 while (true) { //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024) val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024).build() val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer println("consumed Message " + messageSet(0).message) } This currently loops forever b/c it isn't incrementing the offset or anything. I'm confused b/c I believe there is no more offset as things are more user friendly with an incrmeenting counter. Any help would be appreciated.