C0urante commented on code in PR #15180: URL: https://github.com/apache/kafka/pull/15180#discussion_r1469698231
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -620,6 +650,11 @@ private boolean startTask( try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); + + int maxTasks = connConfig.tasksMax(); + int numTasks = configState.taskCount(id.connector()); + checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax()); Review Comment: This check handles the case where a connector has generated too many tasks before the worker is upgraded to a version that includes the changes from this PR. We want to catch these kinds of cases because we may develop features later on (such as static assignment) that may be incompatible with too many tasks, so we can't wait for the connector to have to generate new task configs to make the check (and potentially allow an excessive number of tasks to keep running). Regarding a stale config snapshot--great question, and it's taken me the last half hour of reviewing `DistributedHerder` code to try to get to the bottom of this, so please check my work. I don't believe this should be a problem because the leader includes the latest offset that it's read from the config topic during rebalances, and every worker makes sure that they've read at least that far into the config topic before starting any newly-assigned connectors or tasks. See `handleRebalanceCompleted` [here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1761-L1764) for the worker checking its config snapshot offset against the leader's, [here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1776-L1787) for the read-to-end if the worker is lagging, and [here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/Dist ributedHerder.java#L1802) for where newly-assigned connectors and tasks are started. For the reporting of the latest offset read from the config topic during rebalance, see `WorkerCoordinator::metadata` [here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java#L182), where a fresh snapshot is taken right before the worker (re)joins the group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org