I tried to mimic the poll method that we have with the new consumer API in the 0.9.0.0 version. Here is what I have:
def readFromKafka() = { val streams = consumerStreamsMap.get(consumerConfig.topic) @tailrec def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = { val iterator = streams.iterator() val isTimeLapsed = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) > pollConfig.pollTimeout if (iterator.hasNext && isTimeLapsed) { val newMessages = iterator.next.asScala.toSeq.map { case msg => new String(msg.message()) } poll(pollConfig, messages ++ newMessages) } else { messages } } val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String]) // TODO: re-work after the entire code works! consumer.commitOffsets() toTsDataPointSeq(messages).flatten } Is this good enough? Am I doing it right? Am I committing the right Offset?