[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14223532#comment-14223532 ]
Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:21 PM: ----------------------------------------------------------------- [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace("Closing the Kafka producer."); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug("The Kafka producer has closed."); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh was (Author: bmis13): Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace("Closing the Kafka producer."); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug("The Kafka producer has closed."); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh > [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network > connection is lost > --------------------------------------------------------------------------------------- > > Key: KAFKA-1642 > URL: https://issues.apache.org/jira/browse/KAFKA-1642 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.2 > Reporter: Bhavesh Mistry > Assignee: Ewen Cheslack-Postava > Fix For: 0.8.2 > > Attachments: > 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, > KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, > KAFKA-1642_2014-10-23_16:19:41.patch > > > I see my CPU spike to 100% when network connection is lost for while. It > seems network IO thread are very busy logging following error message. Is > this expected behavior ? > 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka > producer I/O thread: > java.lang.IllegalStateException: No entry found for node -2 > at > org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) > at > org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) > at > org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > at java.lang.Thread.run(Thread.java:744) > Thanks, > Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)