Bingkun created KAFKA-17771:
-------------------------------

             Summary: Local segment doesn't gets deleted when exceeding 
retention
                 Key: KAFKA-17771
                 URL: https://issues.apache.org/jira/browse/KAFKA-17771
             Project: Kafka
          Issue Type: Bug
         Environment: Kafka Version 3.7.0 and 3.7.1
            Reporter: Bingkun


Background:

We're using tiered storage with Kafka version 3.7.0 and saw an issue as 
describe below.

{{}}

Locally:

Normally for each partition there's about 9~11 segments stored locally, but 
sometimes for a certain partition, the cluster seems to 'forget' to delete the 
local segments that is out of the retention policy. As a result, the number of 
segments can grow excessively and the data size for the broker would go up 
non-stop as well, causing the issue of high disk utilization locally. It 
happened to 2 brokers and one of them of the leader of the partition and the 
other one is the follower of the partition. After observing the issue, restart 
the Kafka service in the broker who is the leader for the partition and the 
out-of-retention segments would be purged afterwards.

Tiered Storage: (S3)

Another observation when the issue happened (the cluster "forget" to delete the 
segment locally) is that the partition has way less segments in S3, compared to 
the other partitions. The "stuck" segment is uploaded to S3 though, it's just 
the local segment doesn't get deleted. After I ran restart broker, the segment 
with issue will be uploaded to S3 again and the segments that "piled up" 
locally would also be uploaded to S3 normally.

Logs:

Currently for the logs, we didn't find any logs that can directly indicate the 
what the root cause and we just found that there's no mark delete for the 
segment and no delete operation for the segment.

Configurations: (topic and broker configuration)

We keep the local retention to be 20% of the total retention and the 
configuration looks like this:{{      }}
{code:java}
config = { 
    "retention.ms" = 86400000 / 8 # 3 hours 
    "local.retention.ms" = 86400000 / 8 / 5 # 20% local data
    "remote.storage.enable" = true 
}{code}
 
{code:java}
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
num.network.threads=6
num.io.threads=16
queued.max.requests=1000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
default.replication.factor=2
num.recovery.threads.per.data.dir=64
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=60000
zookeeper.connection.timeout.ms=300000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topic.enable=true
controlled.shutdown.enable=true
compression.type=snappy
message.max.bytes=2000000
log.dirs=/data/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
# Kafka Auth and ACL configurations
sasl.enabled.mechanisms=PLAIN
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=liftoff.kafka.auth.PlainServerCustomizedCallbackHandler
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required ;
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
# Remote storage configurations
remote.log.storage.system.enable=true
remote.log.storage.manager.class.path=/etc/kafka/libs/remote_storage_libs/*
remote.log.storage.manager.impl.prefix=rsm.config.
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.metadata.manager.impl.prefix=rlmm.config.
remote.log.metadata.manager.listener.name=PLAINTEXT
rlmm.config.remote.log.metadata.topic.replication.factor=3
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.chunk.size=4194304
rsm.config.key.prefix.mask=true
rsm.config.storage.s3.credentials.default=true
rsm.config.storage.s3.region=us-east-1
# Size of parts in bytes to use when uploading, 160MB here - default is 5MB:
rsm.config.storage.s3.multipart.upload.part.size=167772160
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/data/cache
# Pick some cache size, 16 GiB here:
rsm.config.fetch.chunk.cache.size=17179869184
# Prefetching size, 160 MiB here:
rsm.config.fetch.chunk.cache.prefetch.max.size=167772160
{code}
 

 

Cluster size:

70 brokers with c7g.xlarge instance.

Traffic: 7 million messages/sec and 2.6GiB/sec.

The issue happened like once every 3~4 days and it's really hard to debug the 
issue. So I'm posting it here and maybe we can have some discussion for the 
issue. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to