ramesh-muthusamy commented on a change in pull request #9319: URL: https://github.com/apache/kafka/pull/9319#discussion_r568350269
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -554,15 +570,19 @@ private void resetDelay() { // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0 log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum); int floorConnectors = totalActiveConnectorsNum / totalWorkersNum; - log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors); + int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum); + log.debug("New rounded down (floor) average number of connectors per worker floor connectors {} ciel connectors ", floorConnectors, ceilConnectors); + log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); int floorTasks = totalActiveTasksNum / totalWorkersNum; - log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks); + int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum); + log.debug("New average number of tasks per worker: floor= {}, ceiling= {}", floorTasks, ceilTasks); + int numToRevoke; - int numToRevoke = floorConnectors; for (WorkerLoad existing : existingWorkers) { Iterator<String> connectors = existing.connectors().iterator(); + numToRevoke = existing.connectorsSize() - ceilConnectors; Review comment: removed the if block , thanks for pointing out. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org