George Yang created KAFKA-19046:
-----------------------------------

             Summary: Change delete cleanup policy to compact cleanup policy
                 Key: KAFKA-19046
                 URL: https://issues.apache.org/jira/browse/KAFKA-19046
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 3.7.1
         Environment: Kafka version: 3.7 
mirrormaker2 version: 3.7.1
zk version: 3.6.8
            Reporter: George Yang


The internal topics of MirrorMaker 2 (MM2) sometimes report the following error:
`
Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' 
supplied via the 'offset.storage.topic' property is required to have 
'cleanup.policy=compact' to guarantee consistency and durability of source 
connector offsets, but found the topic currently has 'cleanup.policy=delete'. 
Continuing would likely result in eventually losing source connector offsets 
and problems restarting this Connect cluster in the future. Change the 
'offset.storage.topic' property in the Connect worker configurations to use a 
topic with 'cleanup.policy=compact'.
at 
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
[2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping 
(org.apache.kafka.connect.mirror.MirrorMaker:208)
`
 
This results in the MM2 pods in the cluster entering a CrashLoopBackOff state 
repeatedly. When changing the configuration via kafka-configs.sh, the process 
runs fine. However, as we know, the default Kafka broker configuration for 
log.cleanup.policy is set to delete, while the default cleanup policy for MM2 
is set to compact. It appears that the policy for offset.storage.topic must be 
compact, and similarly for status.storage and config.storage.
 
I want to configure the cleanup policy for these three topics to always be 
compact. I attempted to configure them in connect-mirror-maker.properties as 
shown below, but all attempts failed:
`
offset.storage.topic.properties.cleanup.policy=compact
status.storage.topic.properties.cleanup.policy=compact
config.storage.topic.properties.cleanup.policy=compact
`
or 
`
offset.storage.topic.cleanup.policy=compact
status.storage.topic.cleanup.policy=compact
config.storage.topic.cleanup.policy=compact
`
The logs show that the properties are unknown and report a failure in topic 
creation:
`
 Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1]
org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 
'mm2-offsets.cb.internal': Unknown topic config name: 
topic.properties.cleanup.policy
        at 
org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474)
        at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345)
        at 
org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363)
        at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57)
        at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43)
        at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
        at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
        at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
        at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
        at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: 
Unknown topic config name: topic.properties.cleanup.policy
`
 
How can I configure the cleanup policy correctly? I reviewed the ticket 
KAFKA-17101, which describes a similar issue as mentioned by @Kaushik 
Srinivas.and @Anh Tuan Nguyen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to