ramesh-muthusamy commented on a change in pull request #9319: URL: https://github.com/apache/kafka/pull/9319#discussion_r568350269
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -554,15 +570,19 @@ private void resetDelay() { // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0 log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum); int floorConnectors = totalActiveConnectorsNum / totalWorkersNum; - log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors); + int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum); + log.debug("New rounded down (floor) average number of connectors per worker floor connectors {} ciel connectors ", floorConnectors, ceilConnectors); + log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); int floorTasks = totalActiveTasksNum / totalWorkersNum; - log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks); + int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum); + log.debug("New average number of tasks per worker: floor= {}, ceiling= {}", floorTasks, ceilTasks); + int numToRevoke; - int numToRevoke = floorConnectors; for (WorkerLoad existing : existingWorkers) { Iterator<String> connectors = existing.connectors().iterator(); + numToRevoke = existing.connectorsSize() - ceilConnectors; Review comment: removed the if block , thanks for pointing out. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -260,7 +259,6 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Do not revoke resources for re-assignment while a delayed rebalance is active // Also we do not revoke in two consecutive rebalances by the same leader canRevoke = delay == 0 && canRevoke; - Review comment: thanks resolved it ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org