C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1469698231


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -620,6 +650,11 @@ private boolean startTask(
 
             try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
                 final ConnectorConfig connConfig = new 
ConnectorConfig(plugins, connProps);
+
+                int maxTasks = connConfig.tasksMax();
+                int numTasks = configState.taskCount(id.connector());
+                checkTasksMax(id.connector(), numTasks, maxTasks, 
connConfig.enforceTasksMax());

Review Comment:
   This check handles the case where a connector has generated too many tasks 
before the worker is upgraded to a version that includes the changes from this 
PR. We want to catch these kinds of cases because we may develop features later 
on (such as static assignment) that may be incompatible with too many tasks, so 
we can't wait for the connector to have to generate new task configs to make 
the check (and potentially allow an excessive number of tasks to keep running).
   
   Regarding a stale config snapshot--great question, and it's taken me the 
last half hour of reviewing `DistributedHerder` code to try to get to the 
bottom of this, so please check my work.
   
   I don't believe this should be a problem because the leader includes the 
latest offset that it's read from the config topic during rebalances, and every 
worker makes sure that they've read at least that far into the config topic 
before starting any newly-assigned connectors or tasks. See 
`handleRebalanceCompleted` 
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1761-L1764)
 for the worker checking its config snapshot offset against the leader's, 
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1776-L1787)
 for the read-to-end if the worker is lagging, and 
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/Dist
 ributedHerder.java#L1802) for where newly-assigned connectors and tasks are 
started. For the reporting of the latest offset read from the config topic 
during rebalance, see `WorkerCoordinator::metadata` 
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java#L182),
 where a fresh snapshot is taken right before the worker (re)joins the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to