I want to know how to tune/setup high level Kafka Client to a Kafka server in 
EC2 I set zookeeper.session.timeout.ms=50000. I found that after some time I 
got following error in the logs. I want to know how to tune Kafka parameters to 
run the consumer for ever. I checked and found ZK is running just fine only 
Kafka consumer is throwing exception. I am using 0.8.2.2 version of Kafka 
Client because my production Kafka server in EC2 is using 8.2.2.2 version of 
Kafka.



executor = Executors.newFixedThreadPool(threadCount);
        for (final KafkaStream stream : streams) {
            executor.submit(new EmailProcessorThread(stream, threadNumber, 
context ,  redisTemplate));
        }

public void run() {
            while(true){
                ConsumerIterator<byte[], byte[]> it = stream.iterator();
                while (it.hasNext()) {
                    try{
                        LOG.info("Message received from the topic is " + new 
String(it.next().message()));
                        ObjectMapper mapper = new ObjectMapper();
                        String recordValue = new String(it.next().message());
                        Email email = mapper.readValue(recordValue, 
Email.class);
                        System.out.println("&&&&&&&&^^^^^^^^^ Email data 
received for processing is " + email.toString());
                        MessageListener listener = new MessageListener();
                        listener.processEmail( email, redisTemplate, context) ;
                    }catch(Throwable e){
                        e.printStackTrace();
                       LOG.info("Error received while processing message " + e);
                    }

                }
            }
        }

2017-07-23 00:10:13.488  WARN 48209 --- [main-SendThread(10.202.138.126:2181)] 
org.apache.zookeeper.ClientCnxn          : Session 0x15ac97701bb91f0 for server 
<IP ADDRESS>:2181, unexpected error, closing socket connection and attempting 
reconnect

    java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_91]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[na:1.8.0_91]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[na:1.8.0_91]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_91]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
~[na:1.8.0_91]
        at 
org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) 
~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
        at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
 ~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) 
~[zookeeper-3.4.6.jar!/:3.4.6-1569965]

Reply via email to