[ https://issues.apache.org/jira/browse/KAFKA-10763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Konstantine Karantasis resolved KAFKA-10763. -------------------------------------------- Resolution: Fixed > Task already exists error on same worker due to skip removal of tasks > --------------------------------------------------------------------- > > Key: KAFKA-10763 > URL: https://issues.apache.org/jira/browse/KAFKA-10763 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.3.0 > Reporter: Shao Wang > Assignee: Greg Harris > Priority: Major > Fix For: 2.3.2, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > > In our production environment, upon start two KafkaConnect workers, during > the first couple of minutes, the leader bounces between worker1 and worker2. > And a lot of tasks throw Task already exists in this worker exception on > worker2. > The sequence of events: > worker2(hostname:sinkdp2) > gen3 assign > Start task 1 > gen4 assign task 1 > gen5 assign task 1 > gen6 skip stopping task 1 and removal due to rebalance unresolved > revoke > gen7 assign task 1 > Start task 1(Task already exists eror) > > Worker1(hostname: sinkdp1) > {code:java} > 03:36:07,340 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:36:10,460 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 1 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', > leaderUrl='http://sinkdp1:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta > 03:36:10,694 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Starting task > dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-5][DistributedHerder.java:1073] > 03:36:10,979 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 > with version 0.14.0-SNAPSHOT of type > com.datapipeline.sink.connector.hive.HiveConnectorTask > [pool-9-thread-5][Worker.java:426] > 03:36:37,692 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:36:37,806 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-5][Worker.java:702] > 03:40:09,721 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Finished stopping tasks in preparation for > rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] > 03:40:09,722 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 2 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', > leaderUrl='http://sinkdp1:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_599_20, > dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta > 03:40:09,722 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:41:10,650 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Wasn't unable to resume work after last > rebalance, can skip stopping connectors and tasks > [DistributedHerder-connect-1][DistributedHerder.java:1517] > 03:41:10,650 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 4 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', > leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[], taskIds=[], > revokedConnectorIds=[dp-hive-sink-connector-dptask_599_20, > dp-tidb-connector-dptask > 03:41:10,651 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:42:10,815 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 5 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', > leaderUrl='http://sinkdp1:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta > 03:42:10,953 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Starting task > dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-8][DistributedHerder.java:1073] > 03:42:10,953 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 > with version 0.14.0-SNAPSHOT of type > com.datapipeline.sink.connector.hive.HiveConnectorTask > [pool-9-thread-8][Worker.java:426] > 03:42:29,429 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:43:28,336 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-2][Worker.java:702] > 03:46:05,804 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Finished stopping tasks in preparation for > rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] > 03:46:05,806 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 6 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', > leaderUrl='http://sinkdp1:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_599_20, > dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta > 03:46:05,806 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:47:06,564 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Wasn't unable to resume work after last > rebalance, can skip stopping connectors and tasks > [DistributedHerder-connect-1][DistributedHerder.java:1517] > {code} > Worker2 (hostname: sinkdp2) > {code:java} > 03:36:35,984 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:36:37,780 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 2 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', > leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > [Dist > 03:37:40,789 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:37:40,916 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 3 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', > leaderUrl='http://sinkdp2:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta > 03:37:41,151 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Starting task > dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-1][DistributedHerder.java:1073] > 03:37:41,507 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 > with version 0.14.0-SNAPSHOT of type > com.datapipeline.sink.connector.hive.HiveConnectorTask > [pool-9-thread-1][Worker.java:426] > 03:40:13,254 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:42:27,376 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Finished stopping tasks in preparation for > rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] > 03:42:27,377 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 4 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', > leaderUrl='http://sinkdp2:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta > 03:42:27,378 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:43:28,190 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Wasn't unable to resume work after last > rebalance, can skip stopping connectors and tasks > [DistributedHerder-connect-1][DistributedHerder.java:1517] > 03:43:28,191 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 6 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', > leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], > revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-d > 03:43:28,191 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:44:28,358 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 7 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', > leaderUrl='http://sinkdp2:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta > 03:44:28,692 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Starting task > dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-7][DistributedHerder.java:1073] > kafka.connect.errors.ConnectException: Task already exists in this worker: > dp-hive-sink-connector-dptask_475_22-0 > 03:46:07,401 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:48:07,024 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Finished stopping tasks in preparation for > rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502] > 03:48:07,246 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 8 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', > leaderUrl='http://sinkdp2:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta > 03:48:07,246 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:49:07,446 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Wasn't unable to resume work after last > rebalance, can skip stopping connectors and tasks > [DistributedHerder-connect-1][DistributedHerder.java:1517] > 03:49:07,446 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 10 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-2a498edc-a517-457b-b982-c84cc9ff4521', > leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], > revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector- > 03:49:07,447 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Rebalance started > [DistributedHerder-connect-1][WorkerCoordinator.java:233] > 03:50:07,677 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Joined group at generation 11 with protocol > version 1 and got assignment: Assignment{error=0, > leader='connect-1-d0be6a23-8dfa-4289-a818-8a655effa48d', > leaderUrl='http://sinkdp2:8083/', offset=6457, > connectorIds=[dp-hive-sink-connector-dptask_475_22, > dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpt > 03:50:08,079 [INFO ] [Worker clientId=connect-1, > groupId=group_connect_sink_dp] Starting task > dp-hive-sink-connector-dptask_475_22-0 > [pool-9-thread-3][DistributedHerder.java:1073] > kafka.connect.errors.ConnectException: Task already exists in this worker: > dp-hive-sink-connector-dptask_475_22-0 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)