George Yang created KAFKA-19046: ----------------------------------- Summary: Change delete cleanup policy to compact cleanup policy Key: KAFKA-19046 URL: https://issues.apache.org/jira/browse/KAFKA-19046 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.7.1 Environment: Kafka version: 3.7 mirrormaker2 version: 3.7.1 zk version: 3.6.8 Reporter: George Yang
The internal topics of MirrorMaker 2 (MM2) sometimes report the following error: ` Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389) org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) [2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:208) ` This results in the MM2 pods in the cluster entering a CrashLoopBackOff state repeatedly. When changing the configuration via kafka-configs.sh, the process runs fine. However, as we know, the default Kafka broker configuration for log.cleanup.policy is set to delete, while the default cleanup policy for MM2 is set to compact. It appears that the policy for offset.storage.topic must be compact, and similarly for status.storage and config.storage. I want to configure the cleanup policy for these three topics to always be compact. I attempted to configure them in connect-mirror-maker.properties as shown below, but all attempts failed: ` offset.storage.topic.properties.cleanup.policy=compact status.storage.topic.properties.cleanup.policy=compact config.storage.topic.properties.cleanup.policy=compact ` or ` offset.storage.topic.cleanup.policy=compact status.storage.topic.cleanup.policy=compact config.storage.topic.cleanup.policy=compact ` The logs show that the properties are unknown and report a failure in topic creation: ` Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1] org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.cb.internal': Unknown topic config name: topic.properties.cleanup.policy at org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474) at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345) at org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.properties.cleanup.policy ` How can I configure the cleanup policy correctly? I reviewed the ticket KAFKA-17101, which describes a similar issue as mentioned by @Kaushik Srinivas.and @Anh Tuan Nguyen. -- This message was sent by Atlassian Jira (v8.20.10#820010)