vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962288275


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we 
will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) 
consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);

Review Comment:
   Removed.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we 
will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) 
consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to 
wait for {} ms", delay);
+                scheduledRebalance = time.milliseconds() + delay;
+            } else if (!toExplicitlyRevoke.isEmpty()) {
+                // We had a revocation in this round but not in the previous 
round. Let's store that state.
+                log.debug("Revoking rebalance. Setting the revokedInPrevious 
flag to true");
+                revokedInPrevious = true;
+            } else if (revokedInPrevious) {
+                // No revocations in this round but the previous round had 
one. Probably the workers
+                // have converged to a balanced load. We can reset the 
rebalance clock
+                log.debug("Previous round had revocations but this round 
didn't. Probably, the cluster has reached a " +
+                        "balanced load. Resetting the exponential backoff 
clock");
+                numSuccessiveRevokingRebalances = 0;
+                revokedInPrevious = false;
+            } else {
+                // revokedInPrevious is false and no revocations needed in 
this round. no-op.
+                log.debug("No revocations in previous and current round.");
+            }
         } else {
-            canRevoke = delay == 0;
+            log.debug("Connector and task to revoke assignments: {}", 
toRevoke);

Review Comment:
   Added relevant logs..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to