yashmayya commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152152167


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);

Review Comment:
   Looks like these are also the default values. Instead of hardcoding them 
here separately, could you refactor these out into common constants and share 
them across both the call sites?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -231,14 +237,18 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
+    boolean forceCommitBeforeMoved =
+        tableConfig.getTableType() == TableType.REALTIME && 
rebalanceConfig.isForceCommitBeforeMoved();

Review Comment:
   The force commit config defaults to `false`, so if a user is explicitly 
setting it to `true` for an `OFFLINE` table we should throw some sort of 
validation error to the user informing them that this config is only applicable 
to `REALTIME` tables instead of silently ignoring it IMO.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {

Review Comment:
   I'm not too familiar with the realtime consumption side of things here - in 
what scenarios would one replica be `CONSUMING` when another replica for the 
same segment is `ONLINE` in the ideal state? I thought that scenario should 
only have been possible in the external view, no?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }

Review Comment:
   I think we should fail the rebalance job in this case instead of simply 
logging a warning and proceeding with the rest of the rebalance. If a user 
configures the force commit rebalance config to `true`, it should be honored 
and we shouldn't just ignore a failure (or timeout) during force commit IMO.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 
       // Re-calculate the target assignment if IdealState changed while 
waiting for ExternalView to converge
       ZNRecord idealStateRecord = idealState.getRecord();
-      if (idealStateRecord.getVersion() != expectedVersion) {
-        tableRebalanceLogger.info(
-            "IdealState version changed while waiting for ExternalView to 
converge, re-calculating the target "
-                + "assignment");
-        Map<String, Map<String, String>> oldAssignment = currentAssignment;
-        currentAssignment = idealStateRecord.getMapFields();
-        expectedVersion = idealStateRecord.getVersion();
-
-        // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply the
-        // same target instance state map for these segments to the new ideal 
state as the target assignment
-        boolean segmentsToMoveChanged = false;
-        if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
-          // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment for
-          // new added segments is based on the existing assignment
-          segmentsToMoveChanged = true;
-        } else {
-          for (String segment : segmentsToMove) {
-            Map<String, String> oldInstanceStateMap = 
oldAssignment.get(segment);
-            Map<String, String> currentInstanceStateMap = 
currentAssignment.get(segment);
-            // TODO: Consider allowing segment state change from CONSUMING to 
ONLINE
-            if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
-              tableRebalanceLogger.info(
-                  "Segment state changed in IdealState from: {} to: {} for 
segment: {}, re-calculating the target "
-                      + "assignment based on the new IdealState",
-                  oldInstanceStateMap, currentInstanceStateMap, segment);
-              segmentsToMoveChanged = true;
-              break;
+      Map<String, Map<String, String>> nextAssignment;
+      boolean needsRecalculation;
+      boolean shouldForceCommit = forceCommitBeforeMoved;
+
+      do {
+        needsRecalculation = false;
+
+        // Step 1: Handle version mismatch and recalculate if needed
+        if (idealStateRecord.getVersion() != expectedVersion) {
+          tableRebalanceLogger.info(
+              "IdealState version changed while waiting for ExternalView to 
converge, re-calculating the target "
+                  + "assignment");
+          Map<String, Map<String, String>> oldAssignment = currentAssignment;
+          currentAssignment = idealStateRecord.getMapFields();
+          expectedVersion = idealStateRecord.getVersion();
+
+          // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply
+          // the same target instance state map for these segments to the new 
ideal state as the target assignment
+          boolean segmentsToMoveChanged = false;
+          if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+            // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment
+            // for new added segments is based on the existing assignment
+            segmentsToMoveChanged = true;
+          } else {
+            for (String segment : segmentsToMove) {
+              Map<String, String> oldInstanceStateMap = 
oldAssignment.get(segment);
+              Map<String, String> currentInstanceStateMap = 
currentAssignment.get(segment);
+              // TODO: Consider allowing segment state change from CONSUMING 
to ONLINE
+              if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+                tableRebalanceLogger.info(
+                    "Segment state changed in IdealState from: {} to: {} for 
segment: {}, re-calculating the target "
+                        + "assignment based on the new IdealState",
+                    oldInstanceStateMap, currentInstanceStateMap, segment);
+                segmentsToMoveChanged = true;
+                break;
+              }
             }
           }
-        }
-        if (segmentsToMoveChanged) {
-          try {
-            // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
-            instancePartitionsMap =
-                getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false, minimizeDataMovement,
-                    tableRebalanceLogger).getLeft();
-            tierToInstancePartitionsMap =
-                getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
-                    minimizeDataMovement, tableRebalanceLogger).getLeft();
-            targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
-                tierToInstancePartitionsMap, rebalanceConfig);
-          } catch (Exception e) {
-            onReturnFailure("Caught exception while re-calculating the target 
assignment, aborting the rebalance", e,
-                tableRebalanceLogger);
-            return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
-                "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
-                tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
+          if (segmentsToMoveChanged) {
+            try {
+              // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
+              instancePartitionsMap =
+                  getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false, minimizeDataMovement,
+                      tableRebalanceLogger).getLeft();
+              tierToInstancePartitionsMap =
+                  getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
+                      minimizeDataMovement, tableRebalanceLogger).getLeft();
+              targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
+                  tierToInstancePartitionsMap, rebalanceConfig);
+            } catch (Exception e) {
+              onReturnFailure("Caught exception while re-calculating the 
target assignment, aborting the rebalance", e,
+                  tableRebalanceLogger);
+              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
+                  "Caught exception while re-calculating the target 
assignment: " + e, instancePartitionsMap,
+                  tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
+            }
+          } else {
+            tableRebalanceLogger.info(
+                "No state change found for segments to be moved, 
re-calculating the target assignment based on the "
+                    + "previous target assignment");
+            Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
+            // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
+            targetAssignment = new TreeMap<>(currentAssignment);
+            for (String segment : segmentsToMove) {
+              targetAssignment.put(segment, oldTargetAssignment.get(segment));
+            }
           }
-        } else {
-          tableRebalanceLogger.info(
-              "No state change found for segments to be moved, re-calculating 
the target assignment based on the "
-                  + "previous target assignment");
-          Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
-          // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
-          targetAssignment = new TreeMap<>(currentAssignment);
-          for (String segment : segmentsToMove) {
-            targetAssignment.put(segment, oldTargetAssignment.get(segment));
+        }
+
+        // Step 2: Handle force commit if flag is set, then recalculate if 
force commit occurred
+        if (shouldForceCommit) {
+          nextAssignment =
+              getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+                  lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger);
+          Set<String> consumingSegmentsToMoveNext = 
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+          if (!consumingSegmentsToMoveNext.isEmpty()) {
+            tableRebalanceLogger.info("Force committing {} consuming segments 
before moving them",
+                consumingSegmentsToMoveNext.size());
+            needsRecalculation = true;
+            _tableRebalanceObserver.onTrigger(
+                
TableRebalanceObserver.Trigger.FORCE_COMMIT_BEFORE_MOVED_START_TRIGGER, null, 
null,
+                null);
+            idealState =
+                forceCommitConsumingSegmentsAndWait(tableNameWithType, 
consumingSegmentsToMoveNext,
+                    tableRebalanceLogger);
+            idealStateRecord = idealState.getRecord();
+            _tableRebalanceObserver.onTrigger(
+                
TableRebalanceObserver.Trigger.FORCE_COMMIT_BEFORE_MOVED_END_TRIGGER, null, 
null,
+                new 
TableRebalanceObserver.RebalanceContext(consumingSegmentsToMoveNext.size()));
           }
+          shouldForceCommit = false; // Only attempt force commit once
         }
-      }
+      } while (needsRecalculation);

Review Comment:
   I'm not sure I understand this do-while loop on `needsRecalculation` - is it 
to recalculate the `currentAssignment`, `targetAssignment`, `expectedVersion` 
etc. if the force commit is done? But since we're using an additional 
`shouldForceCommit` local variable to ensure that the force commit part can 
only be done once, we're essentially only doing Step 1 -> Step 2 -> Step 1 
right (and no further iterations should be possible)? If that's the right 
understanding, I think it might be cleaner and more readable to refactor the 
ideal state version mismatch handling out into a separate function that can be 
called after Step 2 if needed instead of having this do-while loop. What do you 
think?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 
       // Re-calculate the target assignment if IdealState changed while 
waiting for ExternalView to converge
       ZNRecord idealStateRecord = idealState.getRecord();
-      if (idealStateRecord.getVersion() != expectedVersion) {
-        tableRebalanceLogger.info(
-            "IdealState version changed while waiting for ExternalView to 
converge, re-calculating the target "
-                + "assignment");
-        Map<String, Map<String, String>> oldAssignment = currentAssignment;
-        currentAssignment = idealStateRecord.getMapFields();
-        expectedVersion = idealStateRecord.getVersion();
-
-        // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply the
-        // same target instance state map for these segments to the new ideal 
state as the target assignment
-        boolean segmentsToMoveChanged = false;
-        if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
-          // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment for
-          // new added segments is based on the existing assignment
-          segmentsToMoveChanged = true;
-        } else {
-          for (String segment : segmentsToMove) {
-            Map<String, String> oldInstanceStateMap = 
oldAssignment.get(segment);
-            Map<String, String> currentInstanceStateMap = 
currentAssignment.get(segment);
-            // TODO: Consider allowing segment state change from CONSUMING to 
ONLINE
-            if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
-              tableRebalanceLogger.info(
-                  "Segment state changed in IdealState from: {} to: {} for 
segment: {}, re-calculating the target "
-                      + "assignment based on the new IdealState",
-                  oldInstanceStateMap, currentInstanceStateMap, segment);
-              segmentsToMoveChanged = true;
-              break;
+      Map<String, Map<String, String>> nextAssignment;
+      boolean needsRecalculation;
+      boolean shouldForceCommit = forceCommitBeforeMoved;
+
+      do {
+        needsRecalculation = false;
+
+        // Step 1: Handle version mismatch and recalculate if needed
+        if (idealStateRecord.getVersion() != expectedVersion) {
+          tableRebalanceLogger.info(
+              "IdealState version changed while waiting for ExternalView to 
converge, re-calculating the target "
+                  + "assignment");
+          Map<String, Map<String, String>> oldAssignment = currentAssignment;
+          currentAssignment = idealStateRecord.getMapFields();
+          expectedVersion = idealStateRecord.getVersion();
+
+          // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply
+          // the same target instance state map for these segments to the new 
ideal state as the target assignment
+          boolean segmentsToMoveChanged = false;
+          if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+            // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment
+            // for new added segments is based on the existing assignment
+            segmentsToMoveChanged = true;
+          } else {
+            for (String segment : segmentsToMove) {
+              Map<String, String> oldInstanceStateMap = 
oldAssignment.get(segment);
+              Map<String, String> currentInstanceStateMap = 
currentAssignment.get(segment);
+              // TODO: Consider allowing segment state change from CONSUMING 
to ONLINE
+              if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+                tableRebalanceLogger.info(
+                    "Segment state changed in IdealState from: {} to: {} for 
segment: {}, re-calculating the target "
+                        + "assignment based on the new IdealState",
+                    oldInstanceStateMap, currentInstanceStateMap, segment);
+                segmentsToMoveChanged = true;
+                break;
+              }
             }
           }
-        }
-        if (segmentsToMoveChanged) {
-          try {
-            // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
-            instancePartitionsMap =
-                getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false, minimizeDataMovement,
-                    tableRebalanceLogger).getLeft();
-            tierToInstancePartitionsMap =
-                getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
-                    minimizeDataMovement, tableRebalanceLogger).getLeft();
-            targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
-                tierToInstancePartitionsMap, rebalanceConfig);
-          } catch (Exception e) {
-            onReturnFailure("Caught exception while re-calculating the target 
assignment, aborting the rebalance", e,
-                tableRebalanceLogger);
-            return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
-                "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
-                tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
+          if (segmentsToMoveChanged) {
+            try {
+              // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
+              instancePartitionsMap =
+                  getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false, minimizeDataMovement,
+                      tableRebalanceLogger).getLeft();
+              tierToInstancePartitionsMap =
+                  getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
+                      minimizeDataMovement, tableRebalanceLogger).getLeft();
+              targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
+                  tierToInstancePartitionsMap, rebalanceConfig);
+            } catch (Exception e) {
+              onReturnFailure("Caught exception while re-calculating the 
target assignment, aborting the rebalance", e,
+                  tableRebalanceLogger);
+              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
+                  "Caught exception while re-calculating the target 
assignment: " + e, instancePartitionsMap,
+                  tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
+            }
+          } else {
+            tableRebalanceLogger.info(
+                "No state change found for segments to be moved, 
re-calculating the target assignment based on the "
+                    + "previous target assignment");
+            Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
+            // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
+            targetAssignment = new TreeMap<>(currentAssignment);
+            for (String segment : segmentsToMove) {
+              targetAssignment.put(segment, oldTargetAssignment.get(segment));
+            }
           }
-        } else {
-          tableRebalanceLogger.info(
-              "No state change found for segments to be moved, re-calculating 
the target assignment based on the "
-                  + "previous target assignment");
-          Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
-          // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
-          targetAssignment = new TreeMap<>(currentAssignment);
-          for (String segment : segmentsToMove) {
-            targetAssignment.put(segment, oldTargetAssignment.get(segment));
+        }
+
+        // Step 2: Handle force commit if flag is set, then recalculate if 
force commit occurred
+        if (shouldForceCommit) {
+          nextAssignment =
+              getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+                  lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger);
+          Set<String> consumingSegmentsToMoveNext = 
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+          if (!consumingSegmentsToMoveNext.isEmpty()) {

Review Comment:
   We're doing this before each rebalance step, but if I understand correctly, 
for non-upsert realtime tables this will actually only happen in the first 
rebalance step because in subsequent steps the new consuming segments would 
already have been created on the right target servers (and 
`consumingSegmentsToMoveNext` would be empty here in subsequent rebalance 
steps) based on the instance partitions change, correct?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }
+    } else {
+      tableRebalanceLogger.warn(
+          "PinotLLCRealtimeSegmentManager is not initialized, cannot force 
commit consuming segments");
+    }

Review Comment:
   Unrelated to this PR, but IMO we should either deprecate the CLI 
command-based table rebalance or else refactor it to use the controller API to 
trigger the table rebalance (and deprecate the ZK-based direct trigger). It 
doesn't make sense to maintain these two separate pathways to rebalances and 
these differences will only accumulate if we don't consolidate it into the 
controller path.
   
   cc - @somandal @Jackie-Jiang 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -151,6 +151,26 @@ public void onTrigger(Trigger trigger, Map<String, 
Map<String, String>> currentS
           updatedStatsInZk = true;
         }
         break;
+      case FORCE_COMMIT_BEFORE_MOVED_START_TRIGGER:
+        LOGGER.info("triggered force commit for consuming segments for table: 
{}",

Review Comment:
   nit: this log seems redundant because we're already logging something 
similar in `TableRebalancer`.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+    if (_pinotLLCRealtimeSegmentManager != null) {
+      ForceCommitBatchConfig forceCommitBatchConfig =
+          ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+      segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+          segmentsToCommit == null ? null : StringUtil.join(",", 
segmentsToCommit.toArray(String[]::new)),
+          forceCommitBatchConfig);
+      try {
+        // Wait until all committed segments have their status set to DONE.
+        // Even for pauseless table, we wait until the segment has been 
uploaded (status DONE). Because we cannot
+        // guarantee there will be available peers for the new instance to 
download (e.g. the only available replica
+        // during the rebalance be the one who's committing, which has 
CONSUMING in EV), which may lead to download
+        // timeout and essentially segment ERROR. Furthermore, we need to wait 
until EV-IS converge anyway, and that
+        // happens only after the committing segment status is set to DONE.
+        
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType, 
segmentsToCommit,
+            forceCommitBatchConfig);
+      } catch (Exception e) {
+        tableRebalanceLogger.warn("Failed to wait for previous batch to 
complete", e);
+      }
+    } else {
+      tableRebalanceLogger.warn(
+          "PinotLLCRealtimeSegmentManager is not initialized, cannot force 
commit consuming segments");
+    }

Review Comment:
   From what I can tell, this should only be possible if someone enables the 
force commit rebalance config when triggering a rebalance using the 
`RebalanceTable` CLI command right? IMO we should add a validation to 
`PinotTableRebalancer` and fail if the config is set with a message saying that 
it's only supported via the controller API (and also update this path to throw 
instead of logging a warning).



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
       @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
       @ApiParam(value = "Whether to update segment target tier as part of the 
rebalance") @DefaultValue("false")
       @QueryParam("updateTargetTier") boolean updateTargetTier,
+      @ApiParam(value = "Do force commit on consuming segments before they are 
rebalanced") @DefaultValue("false")
+      @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,

Review Comment:
   I'm not sure `forceCommitBeforeMoved` is the best name for this parameter - 
how about simply `forceCommitConsumingSegments` instead? And we could either 
keep the current description or enhance it to mention that multiple sets of 
consuming segments could be force committed depending on the number of steps in 
the rebalance.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
       _availableInstances = availableInstances;
     }
   }
+
+  @VisibleForTesting
+  static Set<String> getMovingConsumingSegments(Map<String, Map<String, 
String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment) {
+    Set<String> movingConsumingSegments = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
+      if (targetInstanceStateMap != null && 
targetInstanceStateMap.values().stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
targetInstanceStateMap.values().stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+        if 
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+          movingConsumingSegments.add(segmentName);
+        }
+      }
+    }
+    return movingConsumingSegments;
+  }
+
+  private IdealState forceCommitConsumingSegmentsAndWait(String 
tableNameWithType,
+      @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {

Review Comment:
   Why is `segmentsToCommit` marked as `Nullable`? `getMovingConsumingSegments` 
can only return an empty set and not `null` right? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -473,6 +492,10 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     // StrictReplicaGroupAssignment::rebalanceTable() and similar limitations 
apply here as well
     Object2IntOpenHashMap<String> segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
 
+    boolean isStrictRealtimeSegmentAssignment = (segmentAssignment instanceof 
StrictRealtimeSegmentAssignment);
+    PartitionIdFetcher partitionIdFetcher =
+        new PartitionIdFetcherImpl(tableNameWithType, 
TableConfigUtils.getPartitionColumn(tableConfig), _helixManager,
+            isStrictRealtimeSegmentAssignment);
     // We repeat the following steps until the target assignment is reached:
     // 1. Wait for ExternalView to converge with the IdealState. Fail the 
rebalance if it doesn't converge within the
     //    timeout.

Review Comment:
   Could we update these comments as well? We should include the force commit 
related changes from this PR as well as your previous EV-IS convergence dynamic 
timeout change (the first point here still says that rebalance fails if it 
doesn't converge within the timeout).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to