vamossagar12 commented on code in PR #12561: URL: https://github.com/apache/kafka/pull/12561#discussion_r962288275
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment( existing.tasks().addAll(assignment.tasks()); } ); - canRevoke = toExplicitlyRevoke.size() == 0; + + // If this round and the previous round involved revocation, we will do an exponential + // backoff delay to prevent rebalance storms. + if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) { + numSuccessiveRevokingRebalances++; + delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances); Review Comment: Removed. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment( existing.tasks().addAll(assignment.tasks()); } ); - canRevoke = toExplicitlyRevoke.size() == 0; + + // If this round and the previous round involved revocation, we will do an exponential + // backoff delay to prevent rebalance storms. + if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) { + numSuccessiveRevokingRebalances++; + delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances); + log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay); + scheduledRebalance = time.milliseconds() + delay; + } else if (!toExplicitlyRevoke.isEmpty()) { + // We had a revocation in this round but not in the previous round. Let's store that state. + log.debug("Revoking rebalance. Setting the revokedInPrevious flag to true"); + revokedInPrevious = true; + } else if (revokedInPrevious) { + // No revocations in this round but the previous round had one. Probably the workers + // have converged to a balanced load. We can reset the rebalance clock + log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " + + "balanced load. Resetting the exponential backoff clock"); + numSuccessiveRevokingRebalances = 0; + revokedInPrevious = false; + } else { + // revokedInPrevious is false and no revocations needed in this round. no-op. + log.debug("No revocations in previous and current round."); + } } else { - canRevoke = delay == 0; + log.debug("Connector and task to revoke assignments: {}", toRevoke); Review Comment: Added relevant logs.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org