[ 
https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luying Liu updated KAFKA-8676:
------------------------------
    Description: 
When adding a new connector or changing a connector configuration, Kafka 
Connect 2.3.0 will stop all existing tasks and start all the tasks, including 
the new tasks and the existing ones. However, it is not necessary at all. Only 
the new connector and tasks need to be started. As the rebalancing can be 
applied for both running and suspended tasks.The following patch will fix this 
problem and starts only the new tasks and connectors.

The problem lies in the KafkaConfigBackingStore.ConsumeCallback.onCompletion() 
function (line 623 in KafkaConfigBackingStore.java). When record.key() 
startsWith "commit-", the tasks are being committed, and the deferred tasks are 
processed, Some new tasks are added to the 'updatedTasks'(line 623 in 
KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to 
updateListener to complete the task configuration update(line 638 in 
KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() 
function, the  'updatedTasks' are added to the member variable, 
'taskConfigUpdates', of class DistributedHerder(line 1295 in 
DistributedHerder.java).

In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' in 
updateConfigsWithIncrementalCooperative() (line 445 in DistributedHerder.java). 
The 'taskConfigUpdatesCopy' is subsequently used in 
processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in 
DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy' to 
find connectors to stop(line 492 in DistributedHerder.java), and finally get 
the tasks to stop, which are all the tasks. The worker thread does the actual 
job of stop(line 499 in DistributedHerder.java). 

In the original code, all the tasks are added to the 'updatedTasks' (line 623 
in KafkaConfigBackingStore.java), which means all the active connectors are in 
the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 
'tasksToStop' list. This causes the stops, and of course the subsequent 
restarts, of all the tasks. 

So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the stops 
and restarts of unnecessary tasks.

  was:When adding a new connector or changing a connector configuration, Kafka 
Connect 2.3.0 will stop all existing tasks and start all the tasks, including 
the new tasks and the existing ones. However, it is not necessary at all. Only 
the new connector and tasks need to be started. As the rebalancing can be 
applied for both running and suspended tasks.The following patch will fix this 
problem and starts only the new tasks and connectors.


> Avoid Stopping Unnecessary Connectors and Tasks 
> ------------------------------------------------
>
>                 Key: KAFKA-8676
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8676
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 2.3.0
>         Environment: centOS
>            Reporter: Luying Liu
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 2.3.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When adding a new connector or changing a connector configuration, Kafka 
> Connect 2.3.0 will stop all existing tasks and start all the tasks, including 
> the new tasks and the existing ones. However, it is not necessary at all. 
> Only the new connector and tasks need to be started. As the rebalancing can 
> be applied for both running and suspended tasks.The following patch will fix 
> this problem and starts only the new tasks and connectors.
> The problem lies in the 
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in 
> KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the 
> tasks are being committed, and the deferred tasks are processed, Some new 
> tasks are added to the 'updatedTasks'(line 623 in 
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to 
> updateListener to complete the task configuration update(line 638 in 
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() 
> function, the  'updatedTasks' are added to the member variable, 
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in 
> DistributedHerder.java).
> In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' 
> in updateConfigsWithIncrementalCooperative() (line 445 in 
> DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in 
> processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in 
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy' to 
> find connectors to stop(line 492 in DistributedHerder.java), and finally get 
> the tasks to stop, which are all the tasks. The worker thread does the actual 
> job of stop(line 499 in DistributedHerder.java). 
> In the original code, all the tasks are added to the 'updatedTasks' (line 623 
> in KafkaConfigBackingStore.java), which means all the active connectors are 
> in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 
> 'tasksToStop' list. This causes the stops, and of course the subsequent 
> restarts, of all the tasks. 
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the 
> stops and restarts of unnecessary tasks.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to