Yes, please file a JIRA.

On Fri, Aug 26, 2016 at 2:28 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

> We have been using Kafka 0.9.0.1 (server and Java client libraries). So
> far we had been using it with plaintext transport but recently have been
> considering upgrading to using SSL. It mostly works except that a
> mis-configured producer (and even consumer) causes a hard to relate
> OutOfMemory exception and thus causing the JVM in which the client is
> running, to go into a bad state. We can consistently reproduce that OOM
> very easily. We decided to check if this is something that is fixed in
> 0.10.0.1 so upgraded one of our test systems to that version (both server
> and client libraries) but still see the same issue. Here's how it can be
> easily reproduced
>
>
> 1. Enable SSL listener on the broker via server.properties, as per the
> Kafka documentation
>
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=<location-of-keystore>
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=<location-of-truststore>
> ssl.truststore.password=pass
>
>
> 2. Start zookeeper and kafka server
>
> 3. Create a "oom-test" topic (which will be used for these tests):
>
> kafka-topics.sh --zookeeper localhost:2181 --create --topic *oom-test*
> --partitions 1 --replication-factor 1
>
> 4. Create a simple producer which sends a single message to the topic via
> Java (new producer) APIs:
>
> public class OOMTest {
>
>     public static void main(final String[] args) throws Exception {
>         final Properties kafkaProducerConfigs = new Properties();
> *        // NOTE: Intentionally use a SSL port without specifying
> security.protocol as SSL**
> **kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9093");**
> * kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
>         try (KafkaProducer<String, String> producer = new
> KafkaProducer<>(kafkaProducerConfigs)) {
>             System.out.println("Created Kafka producer");
>             final String topicName = "oom-test";
>             final String message = "Hello OOM!";
>             // send a message to the topic
>             final Future<RecordMetadata> recordMetadataFuture =
> producer.send(new ProducerRecord<>(topicName, message));
>             final RecordMetadata sentRecordMetadata =
> recordMetadataFuture.get();
>             System.out.println("Sent message '" + message + "' to topic '"
> + topicName + "'");
>         }
>         System.out.println("Tests complete");
>
>     }
> }
>
> Notice that the server URL is using a SSL endpoint localhost:9093 but
> isn't specifying any of the other necessary SSL configs like
> security.protocol.
>
> 5. For the sake of easily reproducing this issue run this class with a max
> heap size of 256MB (-Xmx256M). Running this code throws up the following
> OutOfMemoryError in one of the Sender threads:
>
> *18:33:25,770 ERROR [KafkaThread] - Uncaught exception in
> kafka-producer-network-thread | producer-1: **
> **java.lang.OutOfMemoryError: Java heap space**
> *    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at org.apache.kafka.common.network.NetworkReceive.readFromReada
> bleChannel(NetworkReceive.java:93)
>     at org.apache.kafka.common.network.NetworkReceive.readFrom(
> NetworkReceive.java:71)
>     at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
> annel.java:153)
>     at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
> el.java:134)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sende
> r.java:216)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sende
> r.java:128)
>     at java.lang.Thread.run(Thread.java:745)
>
>
> Note that I set it to 256MB as heap size to easily reproduce it but this
> isn't specific to that size. We have been able to reproduce it at even
> 516MB and higher too.
>
> This even happens with the consumer and in fact can be reproduced out of
> the box with the kafka-consumer-group.sh script. All you have to do is run
> that tool as follows:
>
>
> *./kafka-consumer-groups.sh --list --bootstrap-server localhost:9093
> --new-consumer*
>
> Error while executing consumer group command Java heap space
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at org.apache.kafka.common.network.NetworkReceive.readFromReada
> bleChannel(NetworkReceive.java:93)
>     at org.apache.kafka.common.network.NetworkReceive.readFrom(
> NetworkReceive.java:71)
>     at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
> annel.java:154)
>     at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
> el.java:135)
>     at org.apache.kafka.common.network.Selector.pollSelectionKeys(
> Selector.java:323)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.clientPoll(ConsumerNetworkClient.java:360)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:224)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:192)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:163)
>     at kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminC
> lient.scala:49)
>     at kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminCl
> ient.scala:61)
>     at kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminCl
> ient.scala:58)
>     at scala.collection.immutable.List.foreach(List.scala:381)
>     at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
>     at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
>     at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
>     at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.
> scala:117)
>     at kafka.admin.AdminClient.listAllConsumerGroupsFlattened(
> AdminClient.scala:121)
>     at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.l
> ist(ConsumerGroupCommand.scala:311)
>     at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.
> scala:63)
>     at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
>
>
> Notice that here again I'm using the new consumer and the SSL port without
> any additional SSL configs.
>
> Once this OOM occurs, the producer is useless since it's (background)
> sender thread is dead. Not just that, since we run these
> producers/consumers from within our application, this OOM trips the JVM and
> our whole JVM goes into an unstable state.
>
> Debugging shows that the NetworkReceive class in its
> readFromReadableChannel method receives a value of something like 352518912
> and then goes ahead to allocate a ByteBuffer of that size. This 352518912
> is approximately 300 odd MB and obviously causes allocation issues. I
> suspect the value being passed over the channel is incorrect.
>
> Of course this exception is triggered by a user config error but given
> that ends up in a (almost unclear) OOM and causing the JVM to go in a bad
> state, is there a way the Kafka Java library can handle this better? Should
> I file a JIRA for this? I can share a sample application on github, if it
> helps.
>
> -Jaikiran
>

Reply via email to