Yes, please file a JIRA. On Fri, Aug 26, 2016 at 2:28 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote:
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So > far we had been using it with plaintext transport but recently have been > considering upgrading to using SSL. It mostly works except that a > mis-configured producer (and even consumer) causes a hard to relate > OutOfMemory exception and thus causing the JVM in which the client is > running, to go into a bad state. We can consistently reproduce that OOM > very easily. We decided to check if this is something that is fixed in > 0.10.0.1 so upgraded one of our test systems to that version (both server > and client libraries) but still see the same issue. Here's how it can be > easily reproduced > > > 1. Enable SSL listener on the broker via server.properties, as per the > Kafka documentation > > listeners=PLAINTEXT://:9092,SSL://:9093 > ssl.keystore.location=<location-of-keystore> > ssl.keystore.password=pass > ssl.key.password=pass > ssl.truststore.location=<location-of-truststore> > ssl.truststore.password=pass > > > 2. Start zookeeper and kafka server > > 3. Create a "oom-test" topic (which will be used for these tests): > > kafka-topics.sh --zookeeper localhost:2181 --create --topic *oom-test* > --partitions 1 --replication-factor 1 > > 4. Create a simple producer which sends a single message to the topic via > Java (new producer) APIs: > > public class OOMTest { > > public static void main(final String[] args) throws Exception { > final Properties kafkaProducerConfigs = new Properties(); > * // NOTE: Intentionally use a SSL port without specifying > security.protocol as SSL** > **kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9093");** > * kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > try (KafkaProducer<String, String> producer = new > KafkaProducer<>(kafkaProducerConfigs)) { > System.out.println("Created Kafka producer"); > final String topicName = "oom-test"; > final String message = "Hello OOM!"; > // send a message to the topic > final Future<RecordMetadata> recordMetadataFuture = > producer.send(new ProducerRecord<>(topicName, message)); > final RecordMetadata sentRecordMetadata = > recordMetadataFuture.get(); > System.out.println("Sent message '" + message + "' to topic '" > + topicName + "'"); > } > System.out.println("Tests complete"); > > } > } > > Notice that the server URL is using a SSL endpoint localhost:9093 but > isn't specifying any of the other necessary SSL configs like > security.protocol. > > 5. For the sake of easily reproducing this issue run this class with a max > heap size of 256MB (-Xmx256M). Running this code throws up the following > OutOfMemoryError in one of the Sender threads: > > *18:33:25,770 ERROR [KafkaThread] - Uncaught exception in > kafka-producer-network-thread | producer-1: ** > **java.lang.OutOfMemoryError: Java heap space** > * at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at org.apache.kafka.common.network.NetworkReceive.readFromReada > bleChannel(NetworkReceive.java:93) > at org.apache.kafka.common.network.NetworkReceive.readFrom( > NetworkReceive.java:71) > at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh > annel.java:153) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann > el.java:134) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at org.apache.kafka.clients.producer.internals.Sender.run(Sende > r.java:216) > at org.apache.kafka.clients.producer.internals.Sender.run(Sende > r.java:128) > at java.lang.Thread.run(Thread.java:745) > > > Note that I set it to 256MB as heap size to easily reproduce it but this > isn't specific to that size. We have been able to reproduce it at even > 516MB and higher too. > > This even happens with the consumer and in fact can be reproduced out of > the box with the kafka-consumer-group.sh script. All you have to do is run > that tool as follows: > > > *./kafka-consumer-groups.sh --list --bootstrap-server localhost:9093 > --new-consumer* > > Error while executing consumer group command Java heap space > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at org.apache.kafka.common.network.NetworkReceive.readFromReada > bleChannel(NetworkReceive.java:93) > at org.apache.kafka.common.network.NetworkReceive.readFrom( > NetworkReceive.java:71) > at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh > annel.java:154) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann > el.java:135) > at org.apache.kafka.common.network.Selector.pollSelectionKeys( > Selector.java:323) > at org.apache.kafka.common.network.Selector.poll(Selector.java:283) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.clientPoll(ConsumerNetworkClient.java:360) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.poll(ConsumerNetworkClient.java:224) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.poll(ConsumerNetworkClient.java:192) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.poll(ConsumerNetworkClient.java:163) > at kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminC > lient.scala:49) > at kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminCl > ient.scala:61) > at kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminCl > ient.scala:58) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58) > at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87) > at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96) > at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient. > scala:117) > at kafka.admin.AdminClient.listAllConsumerGroupsFlattened( > AdminClient.scala:121) > at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.l > ist(ConsumerGroupCommand.scala:311) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand. > scala:63) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > > > Notice that here again I'm using the new consumer and the SSL port without > any additional SSL configs. > > Once this OOM occurs, the producer is useless since it's (background) > sender thread is dead. Not just that, since we run these > producers/consumers from within our application, this OOM trips the JVM and > our whole JVM goes into an unstable state. > > Debugging shows that the NetworkReceive class in its > readFromReadableChannel method receives a value of something like 352518912 > and then goes ahead to allocate a ByteBuffer of that size. This 352518912 > is approximately 300 odd MB and obviously causes allocation issues. I > suspect the value being passed over the channel is incorrect. > > Of course this exception is triggered by a user config error but given > that ends up in a (almost unclear) OOM and causing the JVM to go in a bad > state, is there a way the Kafka Java library can handle this better? Should > I file a JIRA for this? I can share a sample application on github, if it > helps. > > -Jaikiran >