[ https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergey Ivanov updated KAFKA-16838: ---------------------------------- Description: Hello, When creating connector we faced an error from one of our ConfigProviders about not existing resource, but we didn't try to set that resource as config value: {code:java} [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] [Worker clientId=connect-1, groupId=streaming-service_streaming_service] Failed to reconfigure connector's tasks (local-file-sink), retrying after backoff. org.apache.kafka.common.config.ConfigException: Could not read properties from file /opt/kafka/provider.properties at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) at org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) at org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) {code} After investigation we found out, that few months ago on that cloud there was the connector with the same name and another value for config provider. Then it was removed, but by some reason when we tried to create connector with the same name months ago AbstractHerder tried to update tasks from our previous connector As an example I use FileConfigProvider, but actually any ConfigProvider is accceptable which could raise exception if something wrong with config (like result doesn't exist). We continued our investigation and found the issue https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't tombstone commit and task messages in the config topic of Kafka. As we remember config topic is `compact` *that means commit and tasks are stored every time (months, years after connector removing)* while tombstones for connector messages are cleaned with {{delete.retention.ms}} property. That impacts further connector creations with the same name. We didn't investigate reasons in ConfigClusterStore and how to avoid that issue, because would {+}like to ask{+}, probably it's better to fix KAFKA-7745 and send tombstones for commit and task messages as connect does for connector and target messages? I have synthetic TC to reproduce that error if needed. This is linked with https://issues.apache.org/jira/browse/KAFKA-16837 but it's not the same issue. As WA we can remove connector one more time, to get *tombstone* message for connector in config topic. was: Hello, When creating connector we faced an error from one of our ConfigProviders about not existing resource, but we didn't try to set that resource as config value: {code:java} [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] [Worker clientId=connect-1, groupId=streaming-service_streaming_service] Failed to reconfigure connector's tasks (local-file-sink), retrying after backoff. org.apache.kafka.common.config.ConfigException: Could not read properties from file /opt/kafka/provider.properties at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) at org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) at org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) {code} After investigation we found out, that few months ago on that cloud there was the connector with the same name and another value for config provider. Then it was removed, but by some reason when we tried to create connector with the same name months ago AbstractHerder tried to update tasks from our previous connector As an example I use FileConfigProvider, but actually any ConfigProvider is accceptable which could raise exception if something wrong with config (like result doesn't exist). We continued our investigation and found the issue https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't tombstone commit and task messages in the config topic of Kafka. As we remember config topic is `compact` *that means commit and tasks are stored every time (months, years after connector removing)* and impact further connector creations with the same name. We didn't investigate reasons in ConfigClusterStore and how to avoid that issue, because would {+}like to ask{+}, probably it's better to fix KAFKA-7745 and send tombstones for commit and task messages as connect does for connector and target messages? I have synthetic TC to reproduce that error if needed. This is linked with https://issues.apache.org/jira/browse/KAFKA-16837 but it's not the same issue. > Kafka Connect loads old tasks from removed connectors > ----------------------------------------------------- > > Key: KAFKA-16838 > URL: https://issues.apache.org/jira/browse/KAFKA-16838 > Project: Kafka > Issue Type: Bug > Components: connect > Affects Versions: 3.5.1, 3.6.1, 3.8.0 > Reporter: Sergey Ivanov > Priority: Major > > Hello, > When creating connector we faced an error from one of our ConfigProviders > about not existing resource, but we didn't try to set that resource as config > value: > {code:java} > [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= > ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] > [Worker clientId=connect-1, groupId=streaming-service_streaming_service] > Failed to reconfigure connector's tasks (local-file-sink), retrying after > backoff. > org.apache.kafka.common.config.ConfigException: Could not read properties > from file /opt/kafka/provider.properties > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) > at > org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) > at > org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:840) > {code} > After investigation we found out, that few months ago on that cloud there was > the connector with the same name and another value for config provider. Then > it was removed, but by some reason when we tried to create connector with the > same name months ago AbstractHerder tried to update tasks from our previous > connector > As an example I use FileConfigProvider, but actually any ConfigProvider is > accceptable which could raise exception if something wrong with config (like > result doesn't exist). > We continued our investigation and found the issue > https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't > tombstone commit and task messages in the config topic of Kafka. As we > remember config topic is `compact` *that means commit and tasks are stored > every time (months, years after connector removing)* while tombstones for > connector messages are cleaned with {{delete.retention.ms}} property. That > impacts further connector creations with the same name. > We didn't investigate reasons in ConfigClusterStore and how to avoid that > issue, because would {+}like to ask{+}, probably it's better to fix > KAFKA-7745 and send tombstones for commit and task messages as connect does > for connector and target messages? > I have synthetic TC to reproduce that error if needed. > > This is linked with https://issues.apache.org/jira/browse/KAFKA-16837 but > it's not the same issue. > As WA we can remove connector one more time, to get *tombstone* message for > connector in config topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)