[ 
https://issues.apache.org/jira/browse/KAFKA-900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-900:
--------------------------

       Resolution: Fixed
    Fix Version/s: 0.8
           Status: Resolved  (was: Patch Available)

Thanks for the review. Committed to 0.8.
                
> ClosedByInterruptException when high-level consumer shutdown normally
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-900
>                 URL: https://issues.apache.org/jira/browse/KAFKA-900
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Jason Rosenberg
>            Assignee: Jun Rao
>             Fix For: 0.8
>
>         Attachments: kafka-900.patch
>
>
> I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, 
> all embedded in the same java process: 
> -- spins up a zk instance 
> -- spins up a kafka server using a fresh log directory 
> -- creates a producer and sends a message 
> -- creates a high-level consumer and verifies that it can consume the message 
> -- shuts down the consumer 
> -- stops the kafka server 
> -- stops zk 
> The test seems to be working fine now, however, I consistently see the 
> following exception, when the consumer connector is shutdown:
> 1699 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683]
>  WARN kafka.consumer.ConsumerFetcherThread  - 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683],
>  Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: 
> group1-ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683;
>  ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: 
> [test-topic,0] -> PartitionFetchInfo(1,1048576)
> java.nio.channels.ClosedByInterruptException
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
>       at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 1721 [Thread-12] INFO com.squareup.kafka.server.KafkaServer  - Shutting down 
> KafkaServer
> 2030 [main] INFO com.squareup.kafka.server.KafkaServer  - Shut down complete 
> for KafkaServer
> Disconnected from the target VM, address: '127.0.0.1:49243', transport: 
> 'socket'
> It would be great if instead, something meaningful was logged, like:
> "Consumer connector has been shutdown"

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to