kkonstantine commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r512487036
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -260,7 +259,7 @@ private Long ensureLeaderConfig(long maxOffset,
WorkerCoordinator coordinator) {
// Do not revoke resources for re-assignment while a delayed rebalance
is active
// Also we do not revoke in two consecutive rebalances by the same
leader
canRevoke = delay == 0 && canRevoke;
-
+ log.debug("Connector and task to revoke assignment post load balancer
calculation: {}", toRevoke);
Review comment:
Why do we print the assignment again here. They don't seem to change
since we log them[ in line
249](https://github.com/apache/kafka/pull/9319/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4L249)
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks
lostAssignments,
if (scheduledRebalance > 0 && now >= scheduledRebalance) {
// delayed rebalance expired and it's time to assign resources
log.debug("Delayed rebalance expired. Reassigning lost tasks");
- Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+ List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
if (!candidateWorkersForReassignment.isEmpty()) {
candidateWorkerLoad =
pickCandidateWorkerForReassignment(completeWorkerAssignment);
}
- if (candidateWorkerLoad.isPresent()) {
- WorkerLoad workerLoad = candidateWorkerLoad.get();
- log.debug("A candidate worker has been found to assign lost
tasks: {}", workerLoad.worker());
- lostAssignments.connectors().forEach(workerLoad::assign);
- lostAssignments.tasks().forEach(workerLoad::assign);
+ if (!candidateWorkerLoad.isEmpty()) {
+ log.debug("Assigning lost tasks to {} candidate workers: {}",
+ candidateWorkerLoad.size(),
+
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+ Iterator<WorkerLoad> candidateWorkerIterator =
candidateWorkerLoad.iterator();
+ for (String connector : lostAssignments.connectors()) {
+ // Loop over the the candidate workers as many times as it
takes
Review comment:
So we balance the lost tasks among the new workers now. That balances
the tasks but only among the new workers. Have you checked how this works with
task revocation called right after?
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +596,14 @@ private void resetDelay() {
numToRevoke = floorTasks;
for (WorkerLoad existing : existingWorkers) {
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+ numToRevoke = existing.tasksSize() - ceilTasks;
Review comment:
Also, we apply this logic in tasks only. But why not in the connectors
too, if it helps?
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +595,14 @@ private void resetDelay() {
numToRevoke = floorTasks;
Review comment:
This assignment is unused now.
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +596,14 @@ private void resetDelay() {
numToRevoke = floorTasks;
for (WorkerLoad existing : existingWorkers) {
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+ numToRevoke = existing.tasksSize() - ceilTasks;
Review comment:
Can you explain a bit what you aim to achieve with this change here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]