guwensheng created KAFKA-19059: ---------------------------------- Summary: When KafkaClient 3.5.0 is used and attempts to connect to an HTTPS service, OOM occurs on the client JVM. Key: KAFKA-19059 URL: https://issues.apache.org/jira/browse/KAFKA-19059 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.5.0 Reporter: guwensheng Attachments: AdminClient 出现OOM问题_en.md
When I tried to establish an AdminClient connection with a non-Kafka service using KafkaClient 3.5.0, I got OOM. My Java code is: ``` try (AdminClient adminClient = makeAdminClient(config)) { if (CollectionUtils.isEmpty(kafkaTestQueryInfo.getTopic())) { DescribeClusterResult describeClusterResult = adminClient.describeCluster(); KafkaFuture<Node> controller = describeClusterResult.controller(); Node node = controller.get(MAX_WAIT_MIL, TimeUnit.MILLISECONDS); log.debug("kafka connection test success. controller node Id: {}", node.id()); return Optional.empty(); } Set<String> topicSet = new HashSet<>(kafkaTestQueryInfo.getTopic()); DescribeTopicsResult result = adminClient.describeTopics(topicSet); Map<String, TopicDescription> topicDescriptionMap = result.allTopicNames() .get(MAX_WAIT_MIL, TimeUnit.MILLISECONDS); log.info("topicDescriptionMap size: {}", topicDescriptionMap.size()); return Optional.empty(); } public AdminClient makeAdminClient(Map<String, Object> config) throws Exception { return AdminClient.create(config); } ``` I tried to use the SASL_PLAINTEXT protocol for Kafka service authentication. The values of the config object in the code are as follows: ``` {security.protocol=SASL_PLAINTEXT, retries=0, metadata.max.age.ms=0, request.timeout.ms=10000, sasl.mechanism=PLAIN, sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="testUseName" password="testPassword";, bootstrap.servers=xx.xx.xx.xx:31943, retry.backoff.ms=100, default.api.timeout.ms=10000,max.receive.size=10485760} ``` Interestingly, the 31943 port does not point to a standard Kafka service, but to a port that provides HTTPS services. When the code executes to this line: ``` DescribeClusterResult describeClusterResult = adminClient.describeCluster() ``` The OOM problem occurs. According to the core dump file, the code of the KafkaClient is applying for a large memory. The code attempts to apply for a memory of more than 1 GB. This leads directly to the OOM of the JVM process. OOM stack: ``` at java.nio.ByteBuffer.allocate at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate at org.apache.kafka.common.network.NetworkReceive.readFrom at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate at org.apache.kafka.common.network.KafkaChannel.prepare ............................... ``` The problem code is: ``` SaslClientAuthenticator :https://github.com/apache/kafka/blob/3.5/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ``` In line 474 of SaslClientAuthenticator, a NetworkReceive object is declared, and OOM appears in the readFrom method of the NetworkReceive object. ``` private byte[] receiveResponseOrToken() throws IOException { if (netInBuffer == null) netInBuffer = new NetworkReceive(node); netInBuffer.readFrom(transportLayer); byte[] serverPacket = null; if (netInBuffer.complete()) { netInBuffer.payload().rewind(); serverPacket = new byte[netInBuffer.payload().remaining()]; netInBuffer.payload().get(serverPacket, 0, serverPacket.length); netInBuffer = null; //reset the networkReceive as we read all the data. } return serverPacket; } ``` The problem here is that when constructing the NetworkReceive object, you use: ``` public NetworkReceive(String source) ``` Instead of: ``` public NetworkReceive(int maxSize, String source) ``` There is no limit on the maximum memory size NetworkReceive can claim when memory. The logic readFrom in NetworkReceive is risky code: https://github.com/apache/kafka/blob/3.5/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java. ``` private final ByteBuffer size; public static final int UNLIMITED = -1; public long readFrom(ScatteringByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; if (!size.hasRemaining()) { size.rewind(); int receiveSize = size.getInt(); //Determine the size of the memory space to be declared next from the external network byte data. if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) //maxSize default is -1 throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); requestedBufferSize = receiveSize; if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } if (buffer == null && requestedBufferSize != -1) { buffer = memoryPool.tryAllocate(requestedBufferSize); //There try allocate 1G memory, then OOM Happy! if (buffer == null) log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source); } if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; } ``` So the core question here is why, in the SaslClientAuthenticator class, when creating the NetworkReceive object, the max.receive.size in the user config is not taken into account to limit the maximum memory allocation. So I think this is a serious bug that may cause OOM problems. -- This message was sent by Atlassian Jira (v8.20.10#820010)