Bingkun created KAFKA-17771: ------------------------------- Summary: Local segment doesn't gets deleted when exceeding retention Key: KAFKA-17771 URL: https://issues.apache.org/jira/browse/KAFKA-17771 Project: Kafka Issue Type: Bug Environment: Kafka Version 3.7.0 and 3.7.1 Reporter: Bingkun
Background: We're using tiered storage with Kafka version 3.7.0 and saw an issue as describe below. {{}} Locally: Normally for each partition there's about 9~11 segments stored locally, but sometimes for a certain partition, the cluster seems to 'forget' to delete the local segments that is out of the retention policy. As a result, the number of segments can grow excessively and the data size for the broker would go up non-stop as well, causing the issue of high disk utilization locally. It happened to 2 brokers and one of them of the leader of the partition and the other one is the follower of the partition. After observing the issue, restart the Kafka service in the broker who is the leader for the partition and the out-of-retention segments would be purged afterwards. Tiered Storage: (S3) Another observation when the issue happened (the cluster "forget" to delete the segment locally) is that the partition has way less segments in S3, compared to the other partitions. The "stuck" segment is uploaded to S3 though, it's just the local segment doesn't get deleted. After I ran restart broker, the segment with issue will be uploaded to S3 again and the segments that "piled up" locally would also be uploaded to S3 normally. Logs: Currently for the logs, we didn't find any logs that can directly indicate the what the root cause and we just found that there's no mark delete for the segment and no delete operation for the segment. Configurations: (topic and broker configuration) We keep the local retention to be 20% of the total retention and the configuration looks like this:{{ }} {code:java} config = { "retention.ms" = 86400000 / 8 # 3 hours "local.retention.ms" = 86400000 / 8 / 5 # 20% local data "remote.storage.enable" = true }{code} {code:java} listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 num.network.threads=6 num.io.threads=16 queued.max.requests=1000 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 default.replication.factor=2 num.recovery.threads.per.data.dir=64 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=60000 zookeeper.connection.timeout.ms=300000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true delete.topic.enable=true controlled.shutdown.enable=true compression.type=snappy message.max.bytes=2000000 log.dirs=/data/kafka metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter # Kafka Auth and ACL configurations sasl.enabled.mechanisms=PLAIN listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=liftoff.kafka.auth.PlainServerCustomizedCallbackHandler listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required ; authorizer.class.name=kafka.security.authorizer.AclAuthorizer super.users=User:admin allow.everyone.if.no.acl.found=true # Remote storage configurations remote.log.storage.system.enable=true remote.log.storage.manager.class.path=/etc/kafka/libs/remote_storage_libs/* remote.log.storage.manager.impl.prefix=rsm.config. remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager remote.log.metadata.manager.impl.prefix=rlmm.config. remote.log.metadata.manager.listener.name=PLAINTEXT rlmm.config.remote.log.metadata.topic.replication.factor=3 rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage rsm.config.chunk.size=4194304 rsm.config.key.prefix.mask=true rsm.config.storage.s3.credentials.default=true rsm.config.storage.s3.region=us-east-1 # Size of parts in bytes to use when uploading, 160MB here - default is 5MB: rsm.config.storage.s3.multipart.upload.part.size=167772160 rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache rsm.config.fetch.chunk.cache.path=/data/cache # Pick some cache size, 16 GiB here: rsm.config.fetch.chunk.cache.size=17179869184 # Prefetching size, 160 MiB here: rsm.config.fetch.chunk.cache.prefetch.max.size=167772160 {code} Cluster size: 70 brokers with c7g.xlarge instance. Traffic: 7 million messages/sec and 2.6GiB/sec. The issue happened like once every 3~4 days and it's really hard to debug the issue. So I'm posting it here and maybe we can have some discussion for the issue. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)