Sergey Ivanov created KAFKA-16837:
-------------------------------------

             Summary: Kafka Connect fails on update connector for incorrect 
previous Config Provider tasks
                 Key: KAFKA-16837
                 URL: https://issues.apache.org/jira/browse/KAFKA-16837
             Project: Kafka
          Issue Type: Bug
          Components: connect
    Affects Versions: 3.6.1, 3.5.1, 3.8.0
            Reporter: Sergey Ivanov


Hello,

We faced an issue when is not possible to update Connector config if the 
*previous* task contains ConfigProvider's value with incorrect value that leads 
to ConfigException.

I can provide simple Test Case to reproduce it with FileConfigProvider, but 
actually any ConfigProvider is acceptable that could raise exception if 
something wrong with config (like resource doesn't exist).

*Prerequisites:*

Kafka Connect instance with config providers:

 
{code:java}
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
 

1. Create Kafka topic "test"
2. On the KK instance create the file "/opt/kafka/provider.properties" with 
content
{code:java}
topics=test
{code}
3. Create simple FileSink connector:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider.properties:topics}"
}
{code}
4. Checks that everything works fine:
{code:java}
GET /connectors?expand=info&expand=status
...
    "status": {
      "name": "local-file-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.10.10.10:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.10.10.10:8083"
        }
      ],
      "type": "sink"
    }
  }
}
{code}
Looks fine.

5. Renames the file to "/opt/kafka/provider2.properties".
6. Update connector with new correct file name:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider2.properties:topics}"
}
{code}
Update {*}succeed{*}, got 200. 
7. Checks that everything works fine:
{code:java}
{
  "local-file-sink": {
    "info": {
      "name": "local-file-sink",
      "config": {
        "connector.class": "FileStreamSink",
        "file": "/opt/kafka/test.sink.txt",
        "tasks.max": "1",
        "topics": "${file:/opt/kafka/provider2.properties:topics}",
        "name": "local-file-sink"
      },
      "tasks": [
        {
          "connector": "local-file-sink",
          "task": 0
        }
      ],
      "type": "sink"
    },
    "status": {
      "name": "local-file-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.10.10.10:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "FAILED",
          "worker_id": "10.10.10.10:8083",
          "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
        }
      ],
      "type": "sink"
    }
  }
}
{code}
Config has been updated, but new task has not been created. And as result 
connector doesn't work.

It failed on:
{code:java}
[2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
 [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
Failed to reconfigure connector's tasks (local-file-sink), retrying after 
backoff.
org.apache.kafka.common.config.ConfigException: Could not read properties from 
file /opt/kafka/provider.properties
 at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
 at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
 at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
 at 
org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840)  {code}
As I understand it happens, because on the connector update AbstractHerder 
tries to update current tasks:

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]

and before do it, herder {+}tries to compare old task config and new one{+}. 
But it doesn't compare original values, +it tries to get ConfigProvider 
calculated value for previous task+ and failed as not possible to get file for 
previous task, by ConfigProvider.

The main question *do we really need to compare ConfigProvider calculated* 
values there instead of comparing original configs? 
Now it leads to issues as lot of ConfigProviders usually raise Exception if 
resource not found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to