I tried to mimic the poll method that we have with the new consumer API in
the 0.9.0.0 version. Here is what I have:

def readFromKafka() = {
  val streams = consumerStreamsMap.get(consumerConfig.topic)

  @tailrec
  def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
    val iterator = streams.iterator()
    val isTimeLapsed =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) >
pollConfig.pollTimeout
    if (iterator.hasNext && isTimeLapsed) {
      val newMessages = iterator.next.asScala.toSeq.map {
        case msg => new String(msg.message())
      }
      poll(pollConfig, messages ++ newMessages)
    } else {
      messages
    }
  }

  val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
  val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])

  // TODO: re-work after the entire code works!
  consumer.commitOffsets()
  toTsDataPointSeq(messages).flatten
}


Is this good enough? Am I doing it right? Am I committing the right Offset?

Reply via email to