I have written a high level Kafka consumer which is not responding on event sent after 2 hours.
Only error I see in broker log like this. 2017-08-01 05:29:42,102 INFO kafka.network.Processor: Closing socket connection to /10.202.138.126. 2017-08-01 07:34:24,957 ERROR kafka.network.Processor: Closing socket for /XX:XXX.110.61 because of error java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:445) at kafka.network.Processor.run(SocketServer.scala:341) at java.lang.Thread.run(Thread.java:745) I have written a simple Kafka High Level consumer. I have not specified any value for the consumer.timeout.ms. Therefore assuming consumer will never timeout when no message is received from the producer. I have observed that when we do not receive any message from producer for sometime the consumer stop responding to any message that is received after say 10 min. I have set zookeeper.sync.time.ms=6000 and zookeeper.session.timeout.ms=500000 High level Consumer: Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); try{ executor = Executors.newFixedThreadPool(threadCount); int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new EmailProcessorThread(stream, threadNumber, context, redisTemplate )); //Thread RUN public void run() { while(true){ ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { try{ ObjectMapper mapper = new ObjectMapper(); String recordValue = new String(it.next().message()); LOG.info("Message received from the topic is " + recordValue);