[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14257686#comment-14257686 ]
Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:39 PM: ------------------------------------------------------------------ [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata * The metadata for the record that was sent (i.e. the * partition and offset). Null if an error occurred. * @param exception * The exception thrown during processing of this record. * Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on console....not sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} "SIGTERM handler" daemon prio=5 tid=0x00007f8bd79e4000 nid=0x17907 waiting for monitor entry [0x000000011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x000000011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bdd147800 nid=0x15d0b waiting for monitor entry [0x000000011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bdf820000 nid=0x770b waiting for monitor entry [0x000000011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bdc393800 nid=0x1c30f waiting for monitor entry [0x000000011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "Thread-4" prio=5 tid=0x00007f8bdb39f000 nid=0xa107 in Object.wait() [0x000000011ea89000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.$$YJP$$wait(Native Method) at java.lang.Object.wait(Object.java) at java.lang.Thread.join(Thread.java:1280) - locked <0x0000000705c2f650> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1354) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322) at "kafka-producer-network-thread | error" daemon prio=5 tid=0x00007f8bd814e000 nid=0x7403 runnable [0x000000011e6c0000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked <0x0000000705c109f8> (a sun.nio.ch.Util$2) - locked <0x0000000705c109e8> (a java.util.Collections$UnmodifiableSet) - locked <0x0000000705c105c8> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.kafka.common.network.Selector.select(Selector.java:322) at org.apache.kafka.common.network.Selector.poll(Selector.java:212) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:744) {code} Thank you for the patch fix. Thanks, Bhavesh was (Author: bmis13): [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata * The metadata for the record that was sent (i.e. the * partition and offset). Null if an error occurred. * @param exception * The exception thrown during processing of this record. * Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on console....not sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} "SIGTERM handler" daemon prio=5 tid=0x00007f8bd79e4000 nid=0x17907 waiting for monitor entry [0x000000011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x000000011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bdd147800 nid=0x15d0b waiting for monitor entry [0x000000011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bdf820000 nid=0x770b waiting for monitor entry [0x000000011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x00007f8bdc393800 nid=0x1c30f waiting for monitor entry [0x000000011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x000000070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "Thread-4" prio=5 tid=0x00007f8bdb39f000 nid=0xa107 in Object.wait() [0x000000011ea89000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.$$YJP$$wait(Native Method) at java.lang.Object.wait(Object.java) at java.lang.Thread.join(Thread.java:1280) - locked <0x0000000705c2f650> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1354) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322) at "kafka-producer-network-thread | error" daemon prio=5 tid=0x00007f8bd814e000 nid=0x7403 runnable [0x000000011e6c0000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked <0x0000000705c109f8> (a sun.nio.ch.Util$2) - locked <0x0000000705c109e8> (a java.util.Collections$UnmodifiableSet) - locked <0x0000000705c105c8> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.kafka.common.network.Selector.select(Selector.java:322) at org.apache.kafka.common.network.Selector.poll(Selector.java:212) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:744) {code} Thank you for the patch fix. Thanks, Bhavesh > [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network > connection is lost > --------------------------------------------------------------------------------------- > > Key: KAFKA-1642 > URL: https://issues.apache.org/jira/browse/KAFKA-1642 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.2 > Reporter: Bhavesh Mistry > Assignee: Ewen Cheslack-Postava > Priority: Blocker > Fix For: 0.8.2 > > Attachments: > 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, > KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, > KAFKA-1642_2014-10-23_16:19:41.patch > > > I see my CPU spike to 100% when network connection is lost for while. It > seems network IO thread are very busy logging following error message. Is > this expected behavior ? > 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka > producer I/O thread: > java.lang.IllegalStateException: No entry found for node -2 > at > org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) > at > org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) > at > org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > at java.lang.Thread.run(Thread.java:744) > Thanks, > Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)