J-HowHuang commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152783490
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
- if (idealStateRecord.getVersion() != expectedVersion) {
- tableRebalanceLogger.info(
- "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
- + "assignment");
- Map<String, Map<String, String>> oldAssignment = currentAssignment;
- currentAssignment = idealStateRecord.getMapFields();
- expectedVersion = idealStateRecord.getVersion();
-
- // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply the
- // same target instance state map for these segments to the new ideal
state as the target assignment
- boolean segmentsToMoveChanged = false;
- if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
- // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment for
- // new added segments is based on the existing assignment
- segmentsToMoveChanged = true;
- } else {
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
- // TODO: Consider allowing segment state change from CONSUMING to
ONLINE
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- tableRebalanceLogger.info(
- "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
- + "assignment based on the new IdealState",
- oldInstanceStateMap, currentInstanceStateMap, segment);
- segmentsToMoveChanged = true;
- break;
+ Map<String, Map<String, String>> nextAssignment;
+ boolean needsRecalculation;
+ boolean shouldForceCommit = forceCommitBeforeMoved;
+
+ do {
+ needsRecalculation = false;
+
+ // Step 1: Handle version mismatch and recalculate if needed
+ if (idealStateRecord.getVersion() != expectedVersion) {
+ tableRebalanceLogger.info(
+ "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
+ + "assignment");
+ Map<String, Map<String, String>> oldAssignment = currentAssignment;
+ currentAssignment = idealStateRecord.getMapFields();
+ expectedVersion = idealStateRecord.getVersion();
+
+ // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
+ // the same target instance state map for these segments to the new
ideal state as the target assignment
+ boolean segmentsToMoveChanged = false;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
+ // for new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING
to ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ tableRebalanceLogger.info(
+ "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
+ + "assignment based on the new IdealState",
+ oldInstanceStateMap, currentInstanceStateMap, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
- }
- if (segmentsToMoveChanged) {
- try {
- // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap =
- getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
- tableRebalanceLogger).getLeft();
- tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
- minimizeDataMovement, tableRebalanceLogger).getLeft();
- targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
- tierToInstancePartitionsMap, rebalanceConfig);
- } catch (Exception e) {
- onReturnFailure("Caught exception while re-calculating the target
assignment, aborting the rebalance", e,
- tableRebalanceLogger);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
- "Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ if (segmentsToMoveChanged) {
+ try {
+ // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
+ tableRebalanceLogger).getLeft();
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
+ minimizeDataMovement, tableRebalanceLogger).getLeft();
+ targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
+ tierToInstancePartitionsMap, rebalanceConfig);
+ } catch (Exception e) {
+ onReturnFailure("Caught exception while re-calculating the
target assignment, aborting the rebalance", e,
+ tableRebalanceLogger);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
+ "Caught exception while re-calculating the target
assignment: " + e, instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ }
+ } else {
+ tableRebalanceLogger.info(
+ "No state change found for segments to be moved,
re-calculating the target assignment based on the "
+ + "previous target assignment");
+ Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
+ // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
+ targetAssignment = new TreeMap<>(currentAssignment);
+ for (String segment : segmentsToMove) {
+ targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
}
- } else {
- tableRebalanceLogger.info(
- "No state change found for segments to be moved, re-calculating
the target assignment based on the "
- + "previous target assignment");
- Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
- // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
- targetAssignment = new TreeMap<>(currentAssignment);
- for (String segment : segmentsToMove) {
- targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
+
+ // Step 2: Handle force commit if flag is set, then recalculate if
force commit occurred
+ if (shouldForceCommit) {
+ nextAssignment =
+ getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, tableRebalanceLogger);
+ Set<String> consumingSegmentsToMoveNext =
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+ if (!consumingSegmentsToMoveNext.isEmpty()) {
Review Comment:
Before the batching was introduced, yes. With batching, it's not guaranteed
to have all segments to be moved in each step, so there might be a consuming
segment not moving at all in the first step.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]