Yes, ConsumerIterator blocks when there is no new message. This is done by calling take() on a blocking queue.
Thanks, Jun On Sun, Jan 5, 2014 at 5:53 PM, S Ahmed <sahmed1...@gmail.com> wrote: > I'm trying to trace through the codebase and figure out where exactly the > block occurs in the high level consumer? > > public void run() { > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > while (it.hasNext()) > System.out.println("Thread " + m_threadNumber + ": " + new > String(it.next().message())); > System.out.println("Shutting down Thread: " + m_threadNumber); > } > Reference: > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > > So from what I understand, the while.it.hasNext() will block if there are > no new messages for this particular topic/partion correct? > > Just to understand, can someone clarify where in the kafka source this > block occurs, i.e. the broker that this consumer is connected to will keep > a socket connection open to this consumer and block until a new message > that is owned by this consumer thread arrives and then pushes it to the > consumer to process. > > Is it at the iterator level somewhere? > > https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsumerIterator.scala >