J-HowHuang commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152783490


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 
       // Re-calculate the target assignment if IdealState changed while 
waiting for ExternalView to converge
       ZNRecord idealStateRecord = idealState.getRecord();
-      if (idealStateRecord.getVersion() != expectedVersion) {
-        tableRebalanceLogger.info(
-            "IdealState version changed while waiting for ExternalView to 
converge, re-calculating the target "
-                + "assignment");
-        Map<String, Map<String, String>> oldAssignment = currentAssignment;
-        currentAssignment = idealStateRecord.getMapFields();
-        expectedVersion = idealStateRecord.getVersion();
-
-        // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply the
-        // same target instance state map for these segments to the new ideal 
state as the target assignment
-        boolean segmentsToMoveChanged = false;
-        if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
-          // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment for
-          // new added segments is based on the existing assignment
-          segmentsToMoveChanged = true;
-        } else {
-          for (String segment : segmentsToMove) {
-            Map<String, String> oldInstanceStateMap = 
oldAssignment.get(segment);
-            Map<String, String> currentInstanceStateMap = 
currentAssignment.get(segment);
-            // TODO: Consider allowing segment state change from CONSUMING to 
ONLINE
-            if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
-              tableRebalanceLogger.info(
-                  "Segment state changed in IdealState from: {} to: {} for 
segment: {}, re-calculating the target "
-                      + "assignment based on the new IdealState",
-                  oldInstanceStateMap, currentInstanceStateMap, segment);
-              segmentsToMoveChanged = true;
-              break;
+      Map<String, Map<String, String>> nextAssignment;
+      boolean needsRecalculation;
+      boolean shouldForceCommit = forceCommitBeforeMoved;
+
+      do {
+        needsRecalculation = false;
+
+        // Step 1: Handle version mismatch and recalculate if needed
+        if (idealStateRecord.getVersion() != expectedVersion) {
+          tableRebalanceLogger.info(
+              "IdealState version changed while waiting for ExternalView to 
converge, re-calculating the target "
+                  + "assignment");
+          Map<String, Map<String, String>> oldAssignment = currentAssignment;
+          currentAssignment = idealStateRecord.getMapFields();
+          expectedVersion = idealStateRecord.getVersion();
+
+          // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply
+          // the same target instance state map for these segments to the new 
ideal state as the target assignment
+          boolean segmentsToMoveChanged = false;
+          if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+            // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment
+            // for new added segments is based on the existing assignment
+            segmentsToMoveChanged = true;
+          } else {
+            for (String segment : segmentsToMove) {
+              Map<String, String> oldInstanceStateMap = 
oldAssignment.get(segment);
+              Map<String, String> currentInstanceStateMap = 
currentAssignment.get(segment);
+              // TODO: Consider allowing segment state change from CONSUMING 
to ONLINE
+              if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+                tableRebalanceLogger.info(
+                    "Segment state changed in IdealState from: {} to: {} for 
segment: {}, re-calculating the target "
+                        + "assignment based on the new IdealState",
+                    oldInstanceStateMap, currentInstanceStateMap, segment);
+                segmentsToMoveChanged = true;
+                break;
+              }
             }
           }
-        }
-        if (segmentsToMoveChanged) {
-          try {
-            // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
-            instancePartitionsMap =
-                getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false, minimizeDataMovement,
-                    tableRebalanceLogger).getLeft();
-            tierToInstancePartitionsMap =
-                getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
-                    minimizeDataMovement, tableRebalanceLogger).getLeft();
-            targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
-                tierToInstancePartitionsMap, rebalanceConfig);
-          } catch (Exception e) {
-            onReturnFailure("Caught exception while re-calculating the target 
assignment, aborting the rebalance", e,
-                tableRebalanceLogger);
-            return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
-                "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
-                tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
+          if (segmentsToMoveChanged) {
+            try {
+              // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
+              instancePartitionsMap =
+                  getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false, minimizeDataMovement,
+                      tableRebalanceLogger).getLeft();
+              tierToInstancePartitionsMap =
+                  getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
+                      minimizeDataMovement, tableRebalanceLogger).getLeft();
+              targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
+                  tierToInstancePartitionsMap, rebalanceConfig);
+            } catch (Exception e) {
+              onReturnFailure("Caught exception while re-calculating the 
target assignment, aborting the rebalance", e,
+                  tableRebalanceLogger);
+              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
+                  "Caught exception while re-calculating the target 
assignment: " + e, instancePartitionsMap,
+                  tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
+            }
+          } else {
+            tableRebalanceLogger.info(
+                "No state change found for segments to be moved, 
re-calculating the target assignment based on the "
+                    + "previous target assignment");
+            Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
+            // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
+            targetAssignment = new TreeMap<>(currentAssignment);
+            for (String segment : segmentsToMove) {
+              targetAssignment.put(segment, oldTargetAssignment.get(segment));
+            }
           }
-        } else {
-          tableRebalanceLogger.info(
-              "No state change found for segments to be moved, re-calculating 
the target assignment based on the "
-                  + "previous target assignment");
-          Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
-          // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
-          targetAssignment = new TreeMap<>(currentAssignment);
-          for (String segment : segmentsToMove) {
-            targetAssignment.put(segment, oldTargetAssignment.get(segment));
+        }
+
+        // Step 2: Handle force commit if flag is set, then recalculate if 
force commit occurred
+        if (shouldForceCommit) {
+          nextAssignment =
+              getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+                  lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger);
+          Set<String> consumingSegmentsToMoveNext = 
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+          if (!consumingSegmentsToMoveNext.isEmpty()) {

Review Comment:
   Before the batching was introduced, yes. With batching, it's not guaranteed 
to have all segments to be moved in each step, so there might be a consuming 
segment not moving at all in the first step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to