Thank you for explaining that, it makes sense.

-Jaikiran
On Friday 26 August 2016 09:02 PM, Rajini Sivaram wrote:
When a port is configured for SSL, broker expects SSL handshake messages
before any Kafka requests are sent. But since the client is using PLAINTEXT
on an SSL port, the client is interpreting SSL handshake protocol messages
as Kafka requests. Hence the size (300MB) being allocated doesn't really
correspond to a size field. Limiting maximum buffer size would avoid OOM in
this case.

On Fri, Aug 26, 2016 at 4:13 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

Hi Rajini,

Just filed a JIRA as suggested https://issues.apache.org/jira
/browse/KAFKA-4090. More comments inline.

On Friday 26 August 2016 07:53 PM, Rajini Sivaram wrote:

Jaikiran,

At the moment there is no client-side configuration parameter to restrict
the maximum request size on clients. On the broker side,
"socket.request.max.bytes" restricts the maximum buffer size allocated,
protecting the broker from badly behaved or misconfigured clients. A KIP
would be required to add a similar configuration parameter on the
client-side. It will definitely be useful for clients to catch this issue
and report a meaningful error,

I'm wondering what's causing the current code to request around 300MB of
allocation on the client side? Is it just that the number being sent is
incorrect (corrupt protocol message maybe?) or is it really needing that
many bytes?

I don't have good knowledge of Kafka code so I'm not really sure what kind
of data the broker/client exchange. But wouldn't the broker know beforehand
that a particular port is configured for a certain protocol (SSL in this
case) and if a client (producer/consumer) tries to connect to the with a
unsupported protocol (plaintext in this case), it should just throw back a
error code which then is "understood/parsed" by the client side and just
fail such specific consumer/producer? That way the whole JVM need not be
impacted. As I said, I don't have any good knowledge of the Kafka code so I
might be talking something that isn't feasible in terms of
design/implementation in the Kafka library.

-Jaikiran



but I am not sure whether clients would
typically be able to recover from this scenario without restarting the
JVM.


On Fri, Aug 26, 2016 at 3:18 PM, Ismael Juma <ism...@juma.me.uk> wrote:

Yes, please file a JIRA.
On Fri, Aug 26, 2016 at 2:28 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

We have been using Kafka 0.9.0.1 (server and Java client libraries). So
far we had been using it with plaintext transport but recently have been
considering upgrading to using SSL. It mostly works except that a
mis-configured producer (and even consumer) causes a hard to relate
OutOfMemory exception and thus causing the JVM in which the client is
running, to go into a bad state. We can consistently reproduce that OOM
very easily. We decided to check if this is something that is fixed in
0.10.0.1 so upgraded one of our test systems to that version (both
server
and client libraries) but still see the same issue. Here's how it can be
easily reproduced


1. Enable SSL listener on the broker via server.properties, as per the
Kafka documentation

listeners=PLAINTEXT://:9092,SSL://:9093
ssl.keystore.location=<location-of-keystore>
ssl.keystore.password=pass
ssl.key.password=pass
ssl.truststore.location=<location-of-truststore>
ssl.truststore.password=pass


2. Start zookeeper and kafka server

3. Create a "oom-test" topic (which will be used for these tests):

kafka-topics.sh --zookeeper localhost:2181 --create --topic *oom-test*
--partitions 1 --replication-factor 1

4. Create a simple producer which sends a single message to the topic
via
Java (new producer) APIs:

public class OOMTest {

      public static void main(final String[] args) throws Exception {
          final Properties kafkaProducerConfigs = new Properties();
*        // NOTE: Intentionally use a SSL port without specifying
security.protocol as SSL**
**kafkaProducerConfigs.setProperty(ProducerConfig.

BOOTSTRAP_SERVERS_CONFIG,

"localhost:9093");**
* kafkaProducerConfigs.setProperty(ProducerConfig.

KEY_SERIALIZER_CLASS_CONFIG,

StringSerializer.class.getName());
kafkaProducerConfigs.setProperty(ProducerConfig.

VALUE_SERIALIZER_CLASS_CONFIG,

StringSerializer.class.getName());
          try (KafkaProducer<String, String> producer = new
KafkaProducer<>(kafkaProducerConfigs)) {
              System.out.println("Created Kafka producer");
              final String topicName = "oom-test";
              final String message = "Hello OOM!";
              // send a message to the topic
              final Future<RecordMetadata> recordMetadataFuture =
producer.send(new ProducerRecord<>(topicName, message));
              final RecordMetadata sentRecordMetadata =
recordMetadataFuture.get();
              System.out.println("Sent message '" + message + "' to topic

'"

+ topicName + "'");
          }
          System.out.println("Tests complete");

      }
}

Notice that the server URL is using a SSL endpoint localhost:9093 but
isn't specifying any of the other necessary SSL configs like
security.protocol.

5. For the sake of easily reproducing this issue run this class with a

max

heap size of 256MB (-Xmx256M). Running this code throws up the following
OutOfMemoryError in one of the Sender threads:

*18:33:25,770 ERROR [KafkaThread] - Uncaught exception in
kafka-producer-network-thread | producer-1: **
**java.lang.OutOfMemoryError: Java heap space**
*    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
      at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
      at org.apache.kafka.common.network.NetworkReceive.readFromReada
bleChannel(NetworkReceive.java:93)
      at org.apache.kafka.common.network.NetworkReceive.readFrom(
NetworkReceive.java:71)
      at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
annel.java:153)
      at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
el.java:134)
      at org.apache.kafka.common.network.Selector.poll(Selector.java:
286)
      at org.apache.kafka.clients.NetworkClient.poll(

NetworkClient.java:256)

      at org.apache.kafka.clients.producer.internals.Sender.run(Sende
r.java:216)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sende
r.java:128)
      at java.lang.Thread.run(Thread.java:745)


Note that I set it to 256MB as heap size to easily reproduce it but this
isn't specific to that size. We have been able to reproduce it at even
516MB and higher too.

This even happens with the consumer and in fact can be reproduced out of
the box with the kafka-consumer-group.sh script. All you have to do is

run

that tool as follows:


*./kafka-consumer-groups.sh --list --bootstrap-server localhost:9093
--new-consumer*

Error while executing consumer group command Java heap space
java.lang.OutOfMemoryError: Java heap space
      at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
      at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
      at org.apache.kafka.common.network.NetworkReceive.readFromReada
bleChannel(NetworkReceive.java:93)
      at org.apache.kafka.common.network.NetworkReceive.readFrom(
NetworkReceive.java:71)
      at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
annel.java:154)
      at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
el.java:135)
      at org.apache.kafka.common.network.Selector.pollSelectionKeys(
Selector.java:323)
      at org.apache.kafka.common.network.Selector.poll(Selector.java:
283)
      at org.apache.kafka.clients.NetworkClient.poll(

NetworkClient.java:260)

      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.clientPoll(ConsumerNetworkClient.java:360)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:224)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:192)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:163)
      at kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminC
lient.scala:49)
      at kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminCl
ient.scala:61)
      at kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminCl
ient.scala:58)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
      at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
      at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
      at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.
scala:117)
      at kafka.admin.AdminClient.listAllConsumerGroupsFlattened(
AdminClient.scala:121)
      at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.l
ist(ConsumerGroupCommand.scala:311)
      at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.
scala:63)
      at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.
scala)


Notice that here again I'm using the new consumer and the SSL port

without

any additional SSL configs.

Once this OOM occurs, the producer is useless since it's (background)
sender thread is dead. Not just that, since we run these
producers/consumers from within our application, this OOM trips the JVM

and

our whole JVM goes into an unstable state.

Debugging shows that the NetworkReceive class in its
readFromReadableChannel method receives a value of something like

352518912

and then goes ahead to allocate a ByteBuffer of that size. This
352518912
is approximately 300 odd MB and obviously causes allocation issues. I
suspect the value being passed over the channel is incorrect.

Of course this exception is triggered by a user config error but given
that ends up in a (almost unclear) OOM and causing the JVM to go in a
bad
state, is there a way the Kafka Java library can handle this better?

Should

I file a JIRA for this? I can share a sample application on github, if
it
helps.

-Jaikiran





Reply via email to