[ 
https://issues.apache.org/jira/browse/KAFKA-888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13644929#comment-13644929
 ] 

Jun Rao commented on KAFKA-888:
-------------------------------

This logging itself is ok. We interrupt the fetcher thread when shutting down 
the consumer. Other than the logging, could your consumer shutdown? Also, 
normally, you iterate and consume messages in a separate thread. See the 
example in 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
                
> problems when shutting down the java consumer .
> -----------------------------------------------
>
>                 Key: KAFKA-888
>                 URL: https://issues.apache.org/jira/browse/KAFKA-888
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>         Environment: Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 
> x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2 
>            Reporter: kacper chwialkowski
>            Assignee: Neha Narkhede
>            Priority: Minor
>              Labels: bug, consumer, exception
>
> I got the following error when shutting down the consumer :
> ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0]
>  INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: 
> java.nio.channels.ClosedByInterruptException: null
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>  ~[na:1.7.0_21]
>       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) 
> ~[na:1.7.0_21]
>       at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) 
> ~[na:1.7.0_21]
>       at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[na:1.7.0_21]
>       at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[na:1.7.0_21]
>       at kafka.utils.Utils$.read(Utils.scala:394) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>  ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>  ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> and this is how I create my Consumer 
>       public Boolean call() throws Exception {
>             Map<String, Integer> topicCountMap = new HashMap<>();
>             topicCountMap.put(topic, new Integer(1));
>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
> consumer.createMessageStreams(topicCountMap);
>             KafkaStream<byte[], byte[]> stream = 
> consumerMap.get(topic).get(0);
>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
>             it.next();
>             LOGGER.info("Received the message. Shutting down");
>             consumer.commitOffsets();
>             consumer.shutdown();
>             return true;
>         }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to