[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Rao resolved KAFKA-15005. ------------------------------- Resolution: Duplicate > Status of KafkaConnect task not correct > --------------------------------------- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.5.1, 3.0.0, 3.3.2 > Reporter: Yu Wang > Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=xxxx__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)