[ https://issues.apache.org/jira/browse/KAFKA-2770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-2770. ---------------------------------- Resolution: Fixed Issue resolved by pull request 470 [https://github.com/apache/kafka/pull/470] > Race condition causes Mirror Maker to hang during shutdown (new consumer) > ------------------------------------------------------------------------- > > Key: KAFKA-2770 > URL: https://issues.apache.org/jira/browse/KAFKA-2770 > Project: Kafka > Issue Type: Bug > Reporter: Geoff Anderson > Assignee: Guozhang Wang > Priority: Blocker > Fix For: 0.9.0.0 > > > I recently added clean bounce with new consumer to the mirror maker tests > (https://github.com/apache/kafka/pull/427), and noticed that in this case the > mirror maker process (with new consumer) sometimes hangs and fails to stop > when stopped with kill -15 > {code:title=mirror_maker.log|borderStyle=solid} > [2015-11-06 22:06:04,213] INFO Start clean shutdown. > (kafka.tools.MirrorMaker$) > [2015-11-06 22:06:04,221] INFO Shutting down consumer threads. > (kafka.tools.MirrorMaker$) > [2015-11-06 22:06:04,239] INFO [mirrormaker-thread-0] mirrormaker-thread-0 > shutting down (kafka.tools.MirrorMaker$MirrorMakerThread) > [2015-11-06 22:06:04,253] INFO [mirrormaker-thread-0] Flushing producer. > (kafka.tools.MirrorMaker$MirrorMakerThread) > [2015-11-06 22:06:04,254] INFO [mirrormaker-thread-0] Committing consumer > offsets. (kafka.tools.MirrorMaker$MirrorMakerThread) > Exception in thread "mirrormaker-thread-0" > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:194) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:154) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:347) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:895) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:869) > at > kafka.tools.MirrorMaker$MirrorMakerNewConsumer.commit(MirrorMaker.scala:522) > at kafka.tools.MirrorMaker$.commitOffsets(MirrorMaker.scala:338) > at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:406) > [2015-11-06 22:06:29,448] DEBUG Connection with worker4/192.168.50.104 > disconnected (org.apache.kafka.common.network.Selector) > java.io.EOFException > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141) > at org.apache.kafka.common.network.Selector.poll(Selector.java:288) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > {code} > The current working hypothesis is this: > a WakeupException is being triggered during the finally block in mirror maker > by the call to commitOffsets, and the mirror maker thread dies before the > call to shutdownLatch.countDown(). Therefore the shutdownLatch.await() call > in awaitShutdown() blocks forever and the process never exits. > Why can commitOffsets trigger a wakeup exception? > The shutdown hook is triggered in another thread, and does this: > shuttingDown = true > mirrorMakerConsumer.stop() # Calls consumer.wakeup() > If the timing is right (wrong), the wakeup flag is set, but the mirrormaker > produce/consume loop exits without triggering the WakeupException, and the > WakeupException isn't thrown until commitOffsets() is called in the finally > block. -- This message was sent by Atlassian JIRA (v6.3.4#6332)