[ https://issues.apache.org/jira/browse/KAFKA-900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao updated KAFKA-900: -------------------------- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) Thanks for the review. Committed to 0.8. > ClosedByInterruptException when high-level consumer shutdown normally > --------------------------------------------------------------------- > > Key: KAFKA-900 > URL: https://issues.apache.org/jira/browse/KAFKA-900 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8 > Reporter: Jason Rosenberg > Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-900.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, > all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exception, when the consumer connector is shutdown: > 1699 > [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683] > WARN kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683], > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: > group1-ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683; > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: > [test-topic,0] -> PartitionFetchInfo(1,1048576) > java.nio.channels.ClosedByInterruptException > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47) > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > 1721 [Thread-12] INFO com.squareup.kafka.server.KafkaServer - Shutting down > KafkaServer > 2030 [main] INFO com.squareup.kafka.server.KafkaServer - Shut down complete > for KafkaServer > Disconnected from the target VM, address: '127.0.0.1:49243', transport: > 'socket' > It would be great if instead, something meaningful was logged, like: > "Consumer connector has been shutdown" -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira