guwensheng created KAFKA-19059:
----------------------------------

             Summary: When KafkaClient 3.5.0 is used and attempts to connect to 
an HTTPS service, OOM occurs on the client JVM.
                 Key: KAFKA-19059
                 URL: https://issues.apache.org/jira/browse/KAFKA-19059
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.5.0
            Reporter: guwensheng
         Attachments: AdminClient 出现OOM问题_en.md

When I tried to establish an AdminClient connection with a non-Kafka service 
using KafkaClient 3.5.0, I got OOM. My Java code is:

```
try (AdminClient adminClient = makeAdminClient(config)) {
            if (CollectionUtils.isEmpty(kafkaTestQueryInfo.getTopic())) {
                DescribeClusterResult describeClusterResult = 
adminClient.describeCluster();
                KafkaFuture<Node> controller = 
describeClusterResult.controller();
                Node node = controller.get(MAX_WAIT_MIL, TimeUnit.MILLISECONDS);
                log.debug("kafka connection test success. controller node Id: 
{}", node.id());
                return Optional.empty();
            }
            Set<String> topicSet = new HashSet<>(kafkaTestQueryInfo.getTopic());
            DescribeTopicsResult result = adminClient.describeTopics(topicSet);
            Map<String, TopicDescription> topicDescriptionMap = 
result.allTopicNames()
                .get(MAX_WAIT_MIL, TimeUnit.MILLISECONDS);
            log.info("topicDescriptionMap size: {}", 
topicDescriptionMap.size());
            return Optional.empty();
        }

public AdminClient makeAdminClient(Map<String, Object> config) throws Exception 
{
          return AdminClient.create(config);
      }
```

I tried to use the SASL_PLAINTEXT protocol for Kafka service authentication. 
The values of the config object in the code are as follows:

```
{security.protocol=SASL_PLAINTEXT, retries=0, metadata.max.age.ms=0, 
request.timeout.ms=10000, sasl.mechanism=PLAIN, 
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="testUseName" password="testPassword";, 
bootstrap.servers=xx.xx.xx.xx:31943, retry.backoff.ms=100, 
default.api.timeout.ms=10000,max.receive.size=10485760}
```

Interestingly, the 31943 port does not point to a standard Kafka service, but 
to a port that provides HTTPS services. When the code executes to this line:

```
DescribeClusterResult describeClusterResult = adminClient.describeCluster()
```

The OOM problem occurs. According to the core dump file, the code of the 
KafkaClient is applying for a large memory. The code attempts to apply for a 
memory of more than 1 GB. This leads directly to the OOM of the JVM process. 
OOM stack:

```
at java.nio.ByteBuffer.allocate 
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate
at org.apache.kafka.common.network.NetworkReceive.readFrom
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken
 
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse
 
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate
 
at org.apache.kafka.common.network.KafkaChannel.prepare 
...............................
```

The problem code is:

```
SaslClientAuthenticator  
:https://github.com/apache/kafka/blob/3.5/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
```

In line 474 of SaslClientAuthenticator, a NetworkReceive object is declared, 
and OOM appears in the readFrom method of the NetworkReceive object.

```
private byte[] receiveResponseOrToken() throws IOException {
       if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
       netInBuffer.readFrom(transportLayer);
       byte[] serverPacket = null;
       if (netInBuffer.complete()) {
           netInBuffer.payload().rewind();
           serverPacket = new byte[netInBuffer.payload().remaining()];
           netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
           netInBuffer = null; //reset the networkReceive as we read all the 
data.
       }
       return serverPacket;
   }
```

The problem here is that when constructing the NetworkReceive object, you use:

```
public NetworkReceive(String source)
```

Instead of:

```
public NetworkReceive(int maxSize, String source)
```

There is no limit on the maximum memory size NetworkReceive can claim when 
memory. The logic readFrom in NetworkReceive is risky code: 
https://github.com/apache/kafka/blob/3.5/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java.

```
private final ByteBuffer size;

public static final int UNLIMITED = -1;

public long readFrom(ScatteringByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                int receiveSize = size.getInt(); //Determine the size of the 
memory space to be declared next from the external network byte data.
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = 
" + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize) //maxSize 
default is -1
                    throw new InvalidReceiveException("Invalid receive (size = 
" + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; 
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }
        if (buffer == null && requestedBufferSize != -1) { 
            buffer = memoryPool.tryAllocate(requestedBufferSize); //There try 
allocate 1G memory, then OOM Happy!
            if (buffer == null)
                log.trace("Broker low on memory - could not allocate buffer of 
size {} for source {}", requestedBufferSize, source);
        }
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }
```

So the core question here is why, in the SaslClientAuthenticator class, when 
creating the NetworkReceive object, the max.receive.size in the user config is 
not taken into account to limit the maximum memory allocation. So I think this 
is a serious bug that may cause OOM problems.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to