[ 
https://issues.apache.org/jira/browse/KAFKA-19427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17985998#comment-17985998
 ] 

RivenSun edited comment on KAFKA-19427 at 6/25/25 5:49 AM:
-----------------------------------------------------------

[~dajac] [~chia7712] thanks for your reply!
As for why there is a `max.message.bytes` configuration in the config of 
__consumer_offsets, it is because it was created using the default 
configuration of broker ‘message.max.bytes=1073741824’. I made the same 
configuration for the cluster below version 4.0, but there was no problem.
Below is the complete configuration file of my broker machine. I replaced 
sensitive information with ***
{code:java}
process.roles=broker
node.id=2
controller.quorum.voters=***
controller.listener.names=CONTROLLER

broker.rack=us-east-1b
log.dirs=/asyncmq/kafka/data1
listeners=CLIENT_PRIVATE://:9449,SASL_SSL://:9889,INTERNAL_SSL://:9559,PLAIN_PLUGIN_SSL://:9669
advertised.listeners=***
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL,CLIENT_PRIVATE:SASL_SSL,CONTROLLER:SASL_SSL

#ssl config
ssl.keystore.password=***
ssl.key.password=***
ssl.keystore.location=/etc/kafka/keystore.jks
ssl.client.auth=none
ssl.endpoint.identification.algorithm=https
ssl.allow.dn.changes=true
ssl.allow.san.changes=true

#controller communicate config
sasl.mechanism.controller.protocol=PLAIN

#broker communicate config
#security.inter.broker.protocol=SASL_PLAINTEXT
inter.broker.listener.name=INTERNAL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
broker.heartbeat.interval.ms=2000
broker.session.timeout.ms=9000

#acl authorization config
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true
super.users=***

num.partitions=8
default.replication.factor=3
message.max.bytes=1073741824
min.insync.replicas=1

auto.create.topics.enable=False
delete.topic.enable=true
auto.leader.rebalance.enable=true
log.segment.bytes=1073741824
log.retention.hours=168
log.retention.check.interval.ms=300000
log.index.size.max.bytes=10485760
segment.index.bytes=10485760

replica.lag.time.max.ms=30000
fetch.max.bytes=57671680
log.preallocate=false

num.network.threads=200
listener.name.internal_ssl.num.network.threads=50
listener.name.plain_plugin_ssl.num.network.threads=50
num.io.threads=64
queued.max.requests=4000
num.replica.fetchers=8
background.threads=10
num.recovery.threads.per.data.dir=1

connections.max.idle.ms=300000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.retention.minutes=10080
group.max.session.timeout.ms=1800000
group.min.session.timeout.ms=6000

transaction.state.log.num.partitions=50
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000 {code}
The attached file contains the broker memory dump package when the problem 
occurs.
...
Size limit encountered when uploading an attachment. Is there any other way for 
us to send you the dump package?
After compression, my dump package is still 106MB in size

!image-2025-06-25-13-46-30-388.png!

 


was (Author: rivensun):
[~dajac] [~chia7712] thanks for your reply!
As for why there is a `max.message.bytes` configuration in the config of 
__consumer_offsets, it is because it was created using the default 
configuration of broker ‘message.max.bytes=1073741824’. I made the same 
configuration for the cluster below version 4.0, but there was no problem.
Below is the complete configuration file of my broker machine. I replaced 
sensitive information with ***
{code:java}
process.roles=broker
node.id=2
controller.quorum.voters=***
controller.listener.names=CONTROLLER

broker.rack=us-east-1b
log.dirs=/asyncmq/kafka/data1
listeners=CLIENT_PRIVATE://:9449,SASL_SSL://:9889,INTERNAL_SSL://:9559,PLAIN_PLUGIN_SSL://:9669
advertised.listeners=***
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL,CLIENT_PRIVATE:SASL_SSL,CONTROLLER:SASL_SSL

#ssl config
ssl.keystore.password=***
ssl.key.password=***
ssl.keystore.location=/etc/kafka/keystore.jks
ssl.client.auth=none
ssl.endpoint.identification.algorithm=https
ssl.allow.dn.changes=true
ssl.allow.san.changes=true

#controller communicate config
sasl.mechanism.controller.protocol=PLAIN

#broker communicate config
#security.inter.broker.protocol=SASL_PLAINTEXT
inter.broker.listener.name=INTERNAL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
broker.heartbeat.interval.ms=2000
broker.session.timeout.ms=9000

#acl authorization config
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true
super.users=***

num.partitions=8
default.replication.factor=3
message.max.bytes=1073741824
min.insync.replicas=1

auto.create.topics.enable=False
delete.topic.enable=true
auto.leader.rebalance.enable=true
log.segment.bytes=1073741824
log.retention.hours=168
log.retention.check.interval.ms=300000
log.index.size.max.bytes=10485760
segment.index.bytes=10485760

replica.lag.time.max.ms=30000
fetch.max.bytes=57671680
log.preallocate=false

num.network.threads=200
listener.name.internal_ssl.num.network.threads=50
listener.name.plain_plugin_ssl.num.network.threads=50
num.io.threads=64
queued.max.requests=4000
num.replica.fetchers=8
background.threads=10
num.recovery.threads.per.data.dir=1

connections.max.idle.ms=300000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.retention.minutes=10080
group.max.session.timeout.ms=1800000
group.min.session.timeout.ms=6000

transaction.state.log.num.partitions=50
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000 {code}


The attached file contains the broker memory dump package when the problem 
occurs.
...
Size limit encountered when uploading an attachment. Is there any other way for 
us to send you the dump package?

!image-2025-06-25-13-46-30-388.png!

 

> Kafka 4.0 may have a memory leak, causing an OOM exception
> ----------------------------------------------------------
>
>                 Key: KAFKA-19427
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19427
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, group-coordinator
>    Affects Versions: 4.0.0
>            Reporter: RivenSun
>            Priority: Critical
>         Attachments: image-2025-06-23-14-16-00-554.png, 
> image-2025-06-23-14-17-34-767.png, image-2025-06-23-14-28-51-524.png, 
> image-2025-06-23-14-31-47-453.png, image-2025-06-23-15-01-32-074.png, 
> image-2025-06-23-15-04-15-708.png, image-2025-06-23-15-04-26-598.png, 
> image-2025-06-23-15-11-13-026.png, image-2025-06-23-15-33-06-851.png, 
> image-2025-06-23-15-33-26-209.png, image-2025-06-24-10-27-52-116.png, 
> image-2025-06-24-10-29-51-465.png, image-2025-06-24-10-41-40-027.png, 
> image-2025-06-24-10-43-46-826.png, image-2025-06-24-10-50-17-396.png, 
> image-2025-06-25-13-46-30-388.png
>
>
> h3. Kafka cluster configuration
> 1.Kafka version:4.0
> 2.The cluster specifications are: 3 brokers and 3 controllers
> 3.JVM startup parameters:
> !image-2025-06-23-14-16-00-554.png!
> 4.JDK version:
> !image-2025-06-23-14-17-34-767.png!
> h3. Steps to reproduce the problem
> 1.In this new cluster, create a test topic: {*}test{*},and this cluster will 
> eventually have *only this one topic* tested by external users.
> topic config : NewTopic newTopic = new NewTopic("test", 3, (short) 1);
> 2.Start the producer and send 1,000 messages
> 3.Start the consumer and use the earliest strategy for consumption. The 
> groupIds are rivenTest1/rivenTest2/.../rivenTest8
> 4.During the process of starting the consumer, it was found that some 
> consumer groups failed to start, and the coordinator brokers corresponding to 
> these groups also had OOM exceptions
> client error logs:
> {code:java}
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - 
> Successfully logged in.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.9.1
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> f745dfdcee2b9851
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1750661985923
> [main] INFO org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer 
> - [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Subscribed to 
> topic(s): test
> [main] INFO org.apache.kafka.clients.Metadata - [Consumer 
> clientId=consumer-rivenTest6-1, groupId=rivenTest6] Cluster ID: 
> 3esGOWhETi-zo2uHq7NsFg
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Discovered 
> group coordinator 18-97-25-88-k.mq.zoomdev.us:9889 (id: 2147483644 rack: null)
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] (Re-)joining 
> group
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Request joining 
> group due to: need to re-join with the given member-id: 
> consumer-rivenTest6-1-38849218-32fa-430d-b14c-d3ce7ff402c4
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] (Re-)joining 
> group
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Successfully 
> joined group with generation Generation{generationId=17, 
> memberId='consumer-rivenTest6-1-38849218-32fa-430d-b14c-d3ce7ff402c4', 
> protocol='roundrobin'}
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Finished 
> assignment for group at generation 17: 
> {consumer-rivenTest6-1-38849218-32fa-430d-b14c-d3ce7ff402c4=Assignment(partitions=[test-0,
>  test-1, test-2])}
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Request joining 
> group due to: rebalance failed due to 'Unexpected error from SyncGroup: The 
> server experienced an unexpected error when processing the request.' 
> (KafkaException)
> org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
> server experienced an unexpected error when processing the request.
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:893)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:812)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1311)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1286)
>     at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>     at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>     at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:429)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:314)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>     at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:692)
>     at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:623)
>     at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
>     at us.zoom.mq.examples.ConsumerTest.startConsumer(ConsumerTest.java:233)
>     at us.zoom.mq.examples.ConsumerTest.main(ConsumerTest.java:149)
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Member 
> consumer-rivenTest6-1-38849218-32fa-430d-b14c-d3ce7ff402c4 sending LeaveGroup 
> request to coordinator 18-97-25-88-k.mq.zoomdev.us:9889 (id: 2147483644 rack: 
> null) due to the consumer is being closed
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Resetting 
> generation and member id due to: consumer pro-actively leaving the group
> [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] Request joining 
> group due to: consumer pro-actively leaving the group
> [main] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
> - [Consumer clientId=consumer-rivenTest6-1, groupId=rivenTest6] LeaveGroup 
> request with Generation{generationId=17, 
> memberId='consumer-rivenTest6-1-38849218-32fa-430d-b14c-d3ce7ff402c4', 
> protocol='roundrobin'} failed with error: The server experienced an 
> unexpected error when processing the request.
> [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
> [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter
> [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter 
> org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
> [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info 
> kafka.consumer for consumer-rivenTest6-1 unregistered {code}
> coordinator broker error logs:
> !image-2025-06-23-15-04-26-598.png!
>  
>  
> h3. Analysis:
> This is a brand new Kafka4.0 cluster with only one topic created;
> The JDK version is 17;
> Why does the broker encounter OOM so quickly when it is just sending and 
> consuming data? Is there a memory leak somewhere?
> 1 First, use the arthas tool to analyze the memory usage
> !image-2025-06-23-15-04-15-708.png!
> We can see that most of the heap memory is {*}occupied by the old 
> generation{*}, and it is likely that the program will directly experience OOM 
> of the heap memory when it needs to {color:#ff0000}*apply for a large object. 
> It should be noted that the maximum memory we allocate to the Kafka process 
> is actually 3G, and there is also a lot of space left in the heap memory. Why 
> does it directly trigger the Java heap space type OOM in this case?*{color}
> {color:#172b4d}2.Dump memory snapshots and use tools to analyze what is 
> currently occupying a large amount of memory in the program
> !image-2025-06-23-15-33-06-851.png!
> !image-2025-06-23-15-33-26-209.png!
> After analyzing the memory usage, I found that it was basically all the 
> *coordinators* objects in the *CoordinatorRuntime* class that occupied the 
> memory and did not release it; coordinators is a ConcurrentHashMap structure, 
> the key is the TopicPartition type, and the value is the CoordinatorContext 
> type.
> !image-2025-06-23-15-11-13-026.png!
> Why does a broker machine simply start a consumer, the topic has only three 
> partitions, and the consumer group uses no more than 10 partitions in total, 
> and the *coordinators* object in the broker process occupies such a large 
> amount of memory and does not release it?
> Is there a problem with the broker configuration or the JDK17 version or the 
> jvm startup parameters, or is there a memory leak in the kafka 4.0 version 
> code?{color}{color:#172b4d}Please help analyze and answer, looking forward to 
> your reply.
> Thank you very much!{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to