Hi, I am using high level consumer and for every 10 secs I see that consumerInstance exists the below while loop:
ConsumerIterator<byte[], byte[]> it = stream.iterator(); CustomMessage customMessage; while(it.hasNext()) { customMessage = deSerializeObject(it.next().message()); .... } *Note*: # Producer sends a serialized message and consumers de-serializes once the receives the message # During the 10 sec interval consumer receives the messages # I am using 1 topic with 1 partition and 1 consumer Instance makes a consumer *The zookeeper logs show this*: [2015-07-10 17:30:18,293] INFO Accepted socket connection from / 127.0.0.1:36138 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-07-10 17:30:18,295] INFO Client attempting to establish new session at /127.0.0.1:36138 (org.apache.zookeeper.server.ZooKeeperServer) [2015-07-10 17:30:18,306] INFO Established session 0x14e7a65093b0010 with negotiated timeout 60000 for client /127.0.0.1:36138 (org.apache.zookeeper.server.ZooKeeperServer) .... after ~10 secs .... [2015-07-10 17:30:28,740] INFO Processed session termination for sessionid: 0x14e7a65093b0010 (org.apache.zookeeper.server.PrepRequestProcessor) [2015-07-10 17:30:28,752] INFO Closed socket connection for client / 127.0.0.1:36138 which had sessionid 0x14e7a65093b0010 (org.apache.zookeeper.server.NIOServerCnxn) I also see that, "with negotiated timeout 60000 for client", Q1) which config is set to 6000ms? My consumer cofigs looks like this(the following values needs to be fine tuned but these are the test config's that I used): zookeeper.session.timeout.ms: 140000 zookeeper.sync.time.ms: 200 auto.commit.interval.ms: 1000 offsets.storage: kafka Q2) Could some one help if I am missing anything fundamentally?. Please answer my two questions, mentioned above. Thank you, Siva.