[ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Ivanov updated KAFKA-16837:
----------------------------------
    Description: 
Hello,

We faced an issue when is not possible to update Connector config if the 
*previous* task contains ConfigProvider's value with incorrect value that leads 
to ConfigException.

I can provide simple Test Case to reproduce it with FileConfigProvider, but 
actually any ConfigProvider is acceptable that could raise exception if 
something wrong with config (like resource doesn't exist).

*Prerequisites:*

Kafka Connect instance with config providers:

 
{code:java}
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
 

1. Create Kafka topic "test"
2. On the KK instance create the file "/opt/kafka/provider.properties" with 
content
{code:java}
topics=test
{code}
3. Create simple FileSink connector:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider.properties:topics}"
}
{code}
4. Checks that everything works fine:
{code:java}
GET /connectors?expand=info&expand=status
...
    "status": {
      "name": "local-file-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.10.10.10:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.10.10.10:8083"
        }
      ],
      "type": "sink"
    }
  }
}
{code}
Looks fine.

5. Renames the file to "/opt/kafka/provider2.properties".
6. Update connector with new correct file name:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider2.properties:topics}"
}
{code}
Update {*}succeed{*}, got 200. 
7. Checks that everything works fine:
{code:java}
{
  "local-file-sink": {
    "info": {
      "name": "local-file-sink",
      "config": {
        "connector.class": "FileStreamSink",
        "file": "/opt/kafka/test.sink.txt",
        "tasks.max": "1",
        "topics": "${file:/opt/kafka/provider2.properties:topics}",
        "name": "local-file-sink"
      },
      "tasks": [
        {
          "connector": "local-file-sink",
          "task": 0
        }
      ],
      "type": "sink"
    },
    "status": {
      "name": "local-file-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.10.10.10:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "FAILED",
          "worker_id": "10.10.10.10:8083",
          "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
        }
      ],
      "type": "sink"
    }
  }
}
{code}
Config has been updated, but new task has not been created. And as result 
connector doesn't work.

It failed on:
{code:java}
[2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
 [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
Failed to reconfigure connector's tasks (local-file-sink), retrying after 
backoff.
org.apache.kafka.common.config.ConfigException: Could not read properties from 
file /opt/kafka/provider.properties
 at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
 at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
 at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
 at 
org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840)  {code}
As I understand it happens, because on the connector update AbstractHerder 
tries to update current tasks:

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]

and before do it, herder {+}tries to compare old task config and new one{+}. 
But it doesn't compare original values, +it tries to get ConfigProvider 
calculated value for previous task+ and failed as not possible to get file for 
previous task, by ConfigProvider.

The main question *do we really need to compare ConfigProvider calculated* 
values there instead of comparing original configs? 
Now it leads to issues as lot of ConfigProviders usually raise Exception if 
resource not found.

 

As WA we can remove and create connector, instead of update. But there is one 
case when it doesn't help: KAFKA-16838

  was:
Hello,

We faced an issue when is not possible to update Connector config if the 
*previous* task contains ConfigProvider's value with incorrect value that leads 
to ConfigException.

I can provide simple Test Case to reproduce it with FileConfigProvider, but 
actually any ConfigProvider is acceptable that could raise exception if 
something wrong with config (like resource doesn't exist).

*Prerequisites:*

Kafka Connect instance with config providers:

 
{code:java}
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
 

1. Create Kafka topic "test"
2. On the KK instance create the file "/opt/kafka/provider.properties" with 
content
{code:java}
topics=test
{code}
3. Create simple FileSink connector:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider.properties:topics}"
}
{code}
4. Checks that everything works fine:
{code:java}
GET /connectors?expand=info&expand=status
...
    "status": {
      "name": "local-file-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.10.10.10:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.10.10.10:8083"
        }
      ],
      "type": "sink"
    }
  }
}
{code}
Looks fine.

5. Renames the file to "/opt/kafka/provider2.properties".
6. Update connector with new correct file name:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider2.properties:topics}"
}
{code}
Update {*}succeed{*}, got 200. 
7. Checks that everything works fine:
{code:java}
{
  "local-file-sink": {
    "info": {
      "name": "local-file-sink",
      "config": {
        "connector.class": "FileStreamSink",
        "file": "/opt/kafka/test.sink.txt",
        "tasks.max": "1",
        "topics": "${file:/opt/kafka/provider2.properties:topics}",
        "name": "local-file-sink"
      },
      "tasks": [
        {
          "connector": "local-file-sink",
          "task": 0
        }
      ],
      "type": "sink"
    },
    "status": {
      "name": "local-file-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.10.10.10:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "FAILED",
          "worker_id": "10.10.10.10:8083",
          "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
        }
      ],
      "type": "sink"
    }
  }
}
{code}
Config has been updated, but new task has not been created. And as result 
connector doesn't work.

It failed on:
{code:java}
[2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
 [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
Failed to reconfigure connector's tasks (local-file-sink), retrying after 
backoff.
org.apache.kafka.common.config.ConfigException: Could not read properties from 
file /opt/kafka/provider.properties
 at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
 at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
 at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
 at 
org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840)  {code}
As I understand it happens, because on the connector update AbstractHerder 
tries to update current tasks:

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]

and before do it, herder {+}tries to compare old task config and new one{+}. 
But it doesn't compare original values, +it tries to get ConfigProvider 
calculated value for previous task+ and failed as not possible to get file for 
previous task, by ConfigProvider.

The main question *do we really need to compare ConfigProvider calculated* 
values there instead of comparing original configs? 
Now it leads to issues as lot of ConfigProviders usually raise Exception if 
resource not found.


> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16837
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16837
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>    Affects Versions: 3.5.1, 3.6.1, 3.8.0
>            Reporter: Sergey Ivanov
>            Priority: Major
>         Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the KK instance create the file "/opt/kafka/provider.properties" with 
> content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info&expand=status
> ...
>     "status": {
>       "name": "local-file-sink",
>       "connector": {
>         "state": "RUNNING",
>         "worker_id": "10.10.10.10:8083"
>       },
>       "tasks": [
>         {
>           "id": 0,
>           "state": "RUNNING",
>           "worker_id": "10.10.10.10:8083"
>         }
>       ],
>       "type": "sink"
>     }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
>     "info": {
>       "name": "local-file-sink",
>       "config": {
>         "connector.class": "FileStreamSink",
>         "file": "/opt/kafka/test.sink.txt",
>         "tasks.max": "1",
>         "topics": "${file:/opt/kafka/provider2.properties:topics}",
>         "name": "local-file-sink"
>       },
>       "tasks": [
>         {
>           "connector": "local-file-sink",
>           "task": 0
>         }
>       ],
>       "type": "sink"
>     },
>     "status": {
>       "name": "local-file-sink",
>       "connector": {
>         "state": "RUNNING",
>         "worker_id": "10.10.10.10:8083"
>       },
>       "tasks": [
>         {
>           "id": 0,
>           "state": "FAILED",
>           "worker_id": "10.10.10.10:8083",
>           "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
>         }
>       ],
>       "type": "sink"
>     }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)  {code}
> As I understand it happens, because on the connector update AbstractHerder 
> tries to update current tasks:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]
> and before do it, herder {+}tries to compare old task config and new one{+}. 
> But it doesn't compare original values, +it tries to get ConfigProvider 
> calculated value for previous task+ and failed as not possible to get file 
> for previous task, by ConfigProvider.
> The main question *do we really need to compare ConfigProvider calculated* 
> values there instead of comparing original configs? 
> Now it leads to issues as lot of ConfigProviders usually raise Exception if 
> resource not found.
>  
> As WA we can remove and create connector, instead of update. But there is one 
> case when it doesn't help: KAFKA-16838



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to