I have written a high level Kafka consumer which is not responding on event 
sent after 2 hours.

Only error I see in broker log like this.

    2017-08-01 05:29:42,102 INFO kafka.network.Processor: Closing socket 
connection to /10.202.138.126.
    2017-08-01 07:34:24,957 ERROR kafka.network.Processor: Closing socket for 
/XX:XXX.110.61 because of error
    java.io.IOException: Connection timed out
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:197)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
            at kafka.utils.Utils$.read(Utils.scala:380)
            at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
            at kafka.network.Processor.read(SocketServer.scala:445)
            at kafka.network.Processor.run(SocketServer.scala:341)
            at java.lang.Thread.run(Thread.java:745)

I have written a simple Kafka High Level consumer.  I have not specified any 
value for the  consumer.timeout.ms.  Therefore assuming consumer will never 
timeout when no message is received from the producer.  I have observed that 
when we do not receive any message from producer for sometime the consumer stop 
responding to any message that is received after say 10 min.  I have set 
zookeeper.sync.time.ms=6000
and zookeeper.session.timeout.ms=500000


High level Consumer:

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = 
consumer.createMessageStreams(topicCount);
            List<KafkaStream<byte[], byte[]>> streams = 
consumerStreams.get(topic);
            try{
                        executor = Executors.newFixedThreadPool(threadCount);

                        int threadNumber = 0;
                        for (final KafkaStream<byte[], byte[]> stream : 
streams) {
                            executor.submit(new EmailProcessorThread(stream, 
threadNumber, context, redisTemplate ));
    //Thread RUN
     public void run() {
                                while(true){
                            ConsumerIterator<byte[], byte[]> it = 
stream.iterator();
                            while (it.hasNext()) {
                                try{
                                                ObjectMapper mapper = new 
ObjectMapper();
                                                                String 
recordValue = new String(it.next().message());
                                                LOG.info("Message received from 
the topic is " + recordValue);


Reply via email to