kacper chwialkowski created KAFKA-888:
-----------------------------------------

             Summary: problems when shutting down the java consumer .
                 Key: KAFKA-888
                 URL: https://issues.apache.org/jira/browse/KAFKA-888
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.8
         Environment: Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 
x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2 

            Reporter: kacper chwialkowski
            Assignee: Neha Narkhede
            Priority: Minor


I got the following error when shutting down the consumer :

ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0] 
INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: 
java.nio.channels.ClosedByInterruptException: null
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
 ~[na:1.7.0_21]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) 
~[na:1.7.0_21]
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) 
~[na:1.7.0_21]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[na:1.7.0_21]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[na:1.7.0_21]
        at kafka.utils.Utils$.read(Utils.scala:394) 
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) 
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
 [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
 [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
 [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
 [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
 [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
 [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]


and this is how I create my Consumer 

      public Boolean call() throws Exception {
            Map<String, Integer> topicCountMap = new HashMap<>();
            topicCountMap.put(topic, new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            it.next();
            LOGGER.info("Received the message. Shutting down");
            consumer.commitOffsets();
            consumer.shutdown();
            return true;
        }


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to