[ https://issues.apache.org/jira/browse/KAFKA-19046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
George Yang updated KAFKA-19046: -------------------------------- Description: The internal topics of MirrorMaker 2 (MM2) sometimes report the following error: {code:java} Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389) org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) [2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:208){code} This results in the MM2 pods in the cluster entering a CrashLoopBackOff state repeatedly. When changing the configuration via kafka-configs.sh, the process runs fine. However, as we know, the default Kafka broker configuration for log.cleanup.policy is set to delete, while the default cleanup policy for MM2 is set to compact. It appears that the policy for offset.storage.topic must be compact, and similarly for status.storage and config.storage. I want to configure the cleanup policy for these three topics to always be compact. I attempted to configure them in connect-mirror-maker.properties as shown below, but all attempts failed: {code:java} offset.storage.topic.properties.cleanup.policy=compact status.storage.topic.properties.cleanup.policy=compact config.storage.topic.properties.cleanup.policy=compact{code} or {code:java} offset.storage.topic.cleanup.policy=compact status.storage.topic.cleanup.policy=compact config.storage.topic.cleanup.policy=compact{code} The logs show that the properties are unknown and report a failure in topic creation: {code:java} Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1] org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.cb.internal': Unknown topic config name: topic.properties.cleanup.policy at org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474) at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345) at org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.properties.cleanup.policy{code} How can I configure the cleanup policy correctly? I reviewed the ticket KAFKA-17101, which describes a similar issue as mentioned by @Kaushik Srinivas.and @Anh Tuan Nguyen. was: The internal topics of MirrorMaker 2 (MM2) sometimes report the following error: ` Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389) org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) [2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:208) ` This results in the MM2 pods in the cluster entering a CrashLoopBackOff state repeatedly. When changing the configuration via kafka-configs.sh, the process runs fine. However, as we know, the default Kafka broker configuration for log.cleanup.policy is set to delete, while the default cleanup policy for MM2 is set to compact. It appears that the policy for offset.storage.topic must be compact, and similarly for status.storage and config.storage. I want to configure the cleanup policy for these three topics to always be compact. I attempted to configure them in connect-mirror-maker.properties as shown below, but all attempts failed: ` offset.storage.topic.properties.cleanup.policy=compact status.storage.topic.properties.cleanup.policy=compact config.storage.topic.properties.cleanup.policy=compact ` or ` offset.storage.topic.cleanup.policy=compact status.storage.topic.cleanup.policy=compact config.storage.topic.cleanup.policy=compact ` The logs show that the properties are unknown and report a failure in topic creation: ` Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1] org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.cb.internal': Unknown topic config name: topic.properties.cleanup.policy at org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474) at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345) at org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.properties.cleanup.policy ` How can I configure the cleanup policy correctly? I reviewed the ticket KAFKA-17101, which describes a similar issue as mentioned by @Kaushik Srinivas.and @Anh Tuan Nguyen. > Change delete cleanup policy to compact cleanup policy > ------------------------------------------------------ > > Key: KAFKA-19046 > URL: https://issues.apache.org/jira/browse/KAFKA-19046 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.7.1 > Environment: Kafka version: 3.7 > mirrormaker2 version: 3.7.1 > zk version: 3.6.8 > Reporter: George Yang > Priority: Major > Original Estimate: 48h > Remaining Estimate: 48h > > The internal topics of MirrorMaker 2 (MM2) sometimes report the following > error: > {code:java} > Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389) > org.apache.kafka.common.config.ConfigException: Topic > 'mm2-offsets.cb.internal' supplied via the 'offset.storage.topic' property is > required to have 'cleanup.policy=compact' to guarantee consistency and > durability of source connector offsets, but found the topic currently has > 'cleanup.policy=delete'. Continuing would likely result in eventually losing > source connector offsets and problems restarting this Connect cluster in the > future. Change the 'offset.storage.topic' property in the Connect worker > configurations to use a topic with 'cleanup.policy=compact'. > at > org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581) > at > org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47) > at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) > at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) > at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) > at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) > at > org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > [2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping > (org.apache.kafka.connect.mirror.MirrorMaker:208){code} > > This results in the MM2 pods in the cluster entering a CrashLoopBackOff state > repeatedly. When changing the configuration via kafka-configs.sh, the process > runs fine. However, as we know, the default Kafka broker configuration for > log.cleanup.policy is set to delete, while the default cleanup policy for MM2 > is set to compact. It appears that the policy for offset.storage.topic must > be compact, and similarly for status.storage and config.storage. > > I want to configure the cleanup policy for these three topics to always be > compact. I attempted to configure them in connect-mirror-maker.properties as > shown below, but all attempts failed: > {code:java} > offset.storage.topic.properties.cleanup.policy=compact > status.storage.topic.properties.cleanup.policy=compact > config.storage.topic.properties.cleanup.policy=compact{code} > or > {code:java} > offset.storage.topic.cleanup.policy=compact > status.storage.topic.cleanup.policy=compact > config.storage.topic.cleanup.policy=compact{code} > The logs show that the properties are unknown and report a failure in topic > creation: > > {code:java} > Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1] > org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) > 'mm2-offsets.cb.internal': Unknown topic config name: > topic.properties.cleanup.policy > at > org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474) > at > org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345) > at > org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363) > at > org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57) > at > org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43) > at > org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) > at > org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) > at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) > at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) > at > org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: > Unknown topic config name: topic.properties.cleanup.policy{code} > > How can I configure the cleanup policy correctly? I reviewed the ticket > KAFKA-17101, which describes a similar issue as mentioned by @Kaushik > Srinivas.and @Anh Tuan Nguyen. -- This message was sent by Atlassian Jira (v8.20.10#820010)