Loïc Monney created KAFKA-7878:
----------------------------------

             Summary: Connect Task already exists in this worker when failed to 
create consumer
                 Key: KAFKA-7878
                 URL: https://issues.apache.org/jira/browse/KAFKA-7878
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.0.1, 1.0.1
            Reporter: Loïc Monney


*Assumption*
1. DNS is not available during a few minutes
2. Consumer group rebalances
3. Client is not able to resolve DNS entries anymore and fails
4. Task seems already registered, so at next rebalance the task will fail due 
to *Task already exists in this worker* and the only way to recover is to 
restart the connect process

*Real log entries*
* Distributed cluster running one connector on top of Kubernetes
* Connect 2.0.1
* kafka-connect-hdfs 5.0.1
{noformat}
[2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from 
bootstrap.servers as DNS resolution failed for kafka.xxx.net 
(org.apache.kafka.clients.ClientUtils:56)
[2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed 
initialization and will not be started. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:142)
org.apache.kafka.connect.errors.ConnectException: Failed to create consumer
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139)
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474)
 ... 10 more
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
bootstrap urls given in bootstrap.servers
 at 
org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:709)
 ... 13 more
[2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
[2019-01-28 13:31:25,926] INFO Rebalance started 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239)
[2019-01-28 13:31:25,927] INFO Stopping task xxx-22 
(org.apache.kafka.connect.runtime.Worker:555)
[2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for 
rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269)
[2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, groupId=xxx-cluster] 
(Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, groupId=xxx-cluster] 
Successfully joined group with generation 29 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
[2019-01-28 13:31:30,746] INFO Joined group and got assignment: 
Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', 
leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], 
taskIds=[xxx-22]} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
[2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config 
offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858)
[2019-01-28 13:31:30,747] INFO Starting task xxx-22 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:872)
[2019-01-28 13:31:30,747] INFO Creating task xxx-22 
(org.apache.kafka.connect.runtime.Worker:396)
[2019-01-28 13:31:30,748] ERROR Couldn't instantiate task xxx-22 because it has 
an invalid task configuration. This task will not execute until reconfigured. 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:890)
org.apache.kafka.connect.errors.ConnectException: Task already exists in this 
worker: xxx-22
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:399)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
[2019-01-28 13:31:30,749] INFO Finished starting connectors and tasks 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to