Sergey Ivanov created KAFKA-16837: ------------------------------------- Summary: Kafka Connect fails on update connector for incorrect previous Config Provider tasks Key: KAFKA-16837 URL: https://issues.apache.org/jira/browse/KAFKA-16837 Project: Kafka Issue Type: Bug Components: connect Affects Versions: 3.6.1, 3.5.1, 3.8.0 Reporter: Sergey Ivanov
Hello, We faced an issue when is not possible to update Connector config if the *previous* task contains ConfigProvider's value with incorrect value that leads to ConfigException. I can provide simple Test Case to reproduce it with FileConfigProvider, but actually any ConfigProvider is acceptable that could raise exception if something wrong with config (like resource doesn't exist). *Prerequisites:* Kafka Connect instance with config providers: {code:java} config.providers=file config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} 1. Create Kafka topic "test" 2. On the KK instance create the file "/opt/kafka/provider.properties" with content {code:java} topics=test {code} 3. Create simple FileSink connector: {code:java} PUT /connectors/local-file-sink/config { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "/opt/kafka/test.sink.txt", "topics": "${file:/opt/kafka/provider.properties:topics}" } {code} 4. Checks that everything works fine: {code:java} GET /connectors?expand=info&expand=status ... "status": { "name": "local-file-sink", "connector": { "state": "RUNNING", "worker_id": "10.10.10.10:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.10.10.10:8083" } ], "type": "sink" } } } {code} Looks fine. 5. Renames the file to "/opt/kafka/provider2.properties". 6. Update connector with new correct file name: {code:java} PUT /connectors/local-file-sink/config { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "/opt/kafka/test.sink.txt", "topics": "${file:/opt/kafka/provider2.properties:topics}" } {code} Update {*}succeed{*}, got 200. 7. Checks that everything works fine: {code:java} { "local-file-sink": { "info": { "name": "local-file-sink", "config": { "connector.class": "FileStreamSink", "file": "/opt/kafka/test.sink.txt", "tasks.max": "1", "topics": "${file:/opt/kafka/provider2.properties:topics}", "name": "local-file-sink" }, "tasks": [ { "connector": "local-file-sink", "task": 0 } ], "type": "sink" }, "status": { "name": "local-file-sink", "connector": { "state": "RUNNING", "worker_id": "10.10.10.10:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "10.10.10.10:8083", "trace": "org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" } ], "type": "sink" } } } {code} Config has been updated, but new task has not been created. And as result connector doesn't work. It failed on: {code:java} [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] [Worker clientId=connect-1, groupId=streaming-service_streaming_service] Failed to reconfigure connector's tasks (local-file-sink), retrying after backoff. org.apache.kafka.common.config.ConfigException: Could not read properties from file /opt/kafka/provider.properties at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) at org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) at org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) {code} As I understand it happens, because on the connector update AbstractHerder tries to update current tasks: [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051] and before do it, herder {+}tries to compare old task config and new one{+}. But it doesn't compare original values, +it tries to get ConfigProvider calculated value for previous task+ and failed as not possible to get file for previous task, by ConfigProvider. The main question *do we really need to compare ConfigProvider calculated* values there instead of comparing original configs? Now it leads to issues as lot of ConfigProviders usually raise Exception if resource not found. -- This message was sent by Atlassian Jira (v8.20.10#820010)