yashmayya commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2152152167
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
Review Comment:
Looks like these are also the default values. Instead of hardcoding them
here separately, could you refactor these out into common constants and share
them across both the call sites?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -231,14 +237,18 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
+ boolean forceCommitBeforeMoved =
+ tableConfig.getTableType() == TableType.REALTIME &&
rebalanceConfig.isForceCommitBeforeMoved();
Review Comment:
The force commit config defaults to `false`, so if a user is explicitly
setting it to `true` for an `OFFLINE` table we should throw some sort of
validation error to the user informing them that this config is only applicable
to `REALTIME` tables instead of silently ignoring it IMO.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
Review Comment:
I'm not too familiar with the realtime consumption side of things here - in
what scenarios would one replica be `CONSUMING` when another replica for the
same segment is `ONLINE` in the ideal state? I thought that scenario should
only have been possible in the external view, no?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
Review Comment:
I think we should fail the rebalance job in this case instead of simply
logging a warning and proceeding with the rest of the rebalance. If a user
configures the force commit rebalance config to `true`, it should be honored
and we shouldn't just ignore a failure (or timeout) during force commit IMO.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
- if (idealStateRecord.getVersion() != expectedVersion) {
- tableRebalanceLogger.info(
- "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
- + "assignment");
- Map<String, Map<String, String>> oldAssignment = currentAssignment;
- currentAssignment = idealStateRecord.getMapFields();
- expectedVersion = idealStateRecord.getVersion();
-
- // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply the
- // same target instance state map for these segments to the new ideal
state as the target assignment
- boolean segmentsToMoveChanged = false;
- if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
- // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment for
- // new added segments is based on the existing assignment
- segmentsToMoveChanged = true;
- } else {
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
- // TODO: Consider allowing segment state change from CONSUMING to
ONLINE
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- tableRebalanceLogger.info(
- "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
- + "assignment based on the new IdealState",
- oldInstanceStateMap, currentInstanceStateMap, segment);
- segmentsToMoveChanged = true;
- break;
+ Map<String, Map<String, String>> nextAssignment;
+ boolean needsRecalculation;
+ boolean shouldForceCommit = forceCommitBeforeMoved;
+
+ do {
+ needsRecalculation = false;
+
+ // Step 1: Handle version mismatch and recalculate if needed
+ if (idealStateRecord.getVersion() != expectedVersion) {
+ tableRebalanceLogger.info(
+ "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
+ + "assignment");
+ Map<String, Map<String, String>> oldAssignment = currentAssignment;
+ currentAssignment = idealStateRecord.getMapFields();
+ expectedVersion = idealStateRecord.getVersion();
+
+ // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
+ // the same target instance state map for these segments to the new
ideal state as the target assignment
+ boolean segmentsToMoveChanged = false;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
+ // for new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING
to ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ tableRebalanceLogger.info(
+ "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
+ + "assignment based on the new IdealState",
+ oldInstanceStateMap, currentInstanceStateMap, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
- }
- if (segmentsToMoveChanged) {
- try {
- // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap =
- getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
- tableRebalanceLogger).getLeft();
- tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
- minimizeDataMovement, tableRebalanceLogger).getLeft();
- targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
- tierToInstancePartitionsMap, rebalanceConfig);
- } catch (Exception e) {
- onReturnFailure("Caught exception while re-calculating the target
assignment, aborting the rebalance", e,
- tableRebalanceLogger);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
- "Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ if (segmentsToMoveChanged) {
+ try {
+ // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
+ tableRebalanceLogger).getLeft();
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
+ minimizeDataMovement, tableRebalanceLogger).getLeft();
+ targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
+ tierToInstancePartitionsMap, rebalanceConfig);
+ } catch (Exception e) {
+ onReturnFailure("Caught exception while re-calculating the
target assignment, aborting the rebalance", e,
+ tableRebalanceLogger);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
+ "Caught exception while re-calculating the target
assignment: " + e, instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ }
+ } else {
+ tableRebalanceLogger.info(
+ "No state change found for segments to be moved,
re-calculating the target assignment based on the "
+ + "previous target assignment");
+ Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
+ // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
+ targetAssignment = new TreeMap<>(currentAssignment);
+ for (String segment : segmentsToMove) {
+ targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
}
- } else {
- tableRebalanceLogger.info(
- "No state change found for segments to be moved, re-calculating
the target assignment based on the "
- + "previous target assignment");
- Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
- // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
- targetAssignment = new TreeMap<>(currentAssignment);
- for (String segment : segmentsToMove) {
- targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
+
+ // Step 2: Handle force commit if flag is set, then recalculate if
force commit occurred
+ if (shouldForceCommit) {
+ nextAssignment =
+ getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, tableRebalanceLogger);
+ Set<String> consumingSegmentsToMoveNext =
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+ if (!consumingSegmentsToMoveNext.isEmpty()) {
+ tableRebalanceLogger.info("Force committing {} consuming segments
before moving them",
+ consumingSegmentsToMoveNext.size());
+ needsRecalculation = true;
+ _tableRebalanceObserver.onTrigger(
+
TableRebalanceObserver.Trigger.FORCE_COMMIT_BEFORE_MOVED_START_TRIGGER, null,
null,
+ null);
+ idealState =
+ forceCommitConsumingSegmentsAndWait(tableNameWithType,
consumingSegmentsToMoveNext,
+ tableRebalanceLogger);
+ idealStateRecord = idealState.getRecord();
+ _tableRebalanceObserver.onTrigger(
+
TableRebalanceObserver.Trigger.FORCE_COMMIT_BEFORE_MOVED_END_TRIGGER, null,
null,
+ new
TableRebalanceObserver.RebalanceContext(consumingSegmentsToMoveNext.size()));
}
+ shouldForceCommit = false; // Only attempt force commit once
}
- }
+ } while (needsRecalculation);
Review Comment:
I'm not sure I understand this do-while loop on `needsRecalculation` - is it
to recalculate the `currentAssignment`, `targetAssignment`, `expectedVersion`
etc. if the force commit is done? But since we're using an additional
`shouldForceCommit` local variable to ensure that the force commit part can
only be done once, we're essentially only doing Step 1 -> Step 2 -> Step 1
right (and no further iterations should be possible)? If that's the right
understanding, I think it might be cleaner and more readable to refactor the
ideal state version mismatch handling out into a separate function that can be
called after Step 2 if needed instead of having this do-while loop. What do you
think?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
- if (idealStateRecord.getVersion() != expectedVersion) {
- tableRebalanceLogger.info(
- "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
- + "assignment");
- Map<String, Map<String, String>> oldAssignment = currentAssignment;
- currentAssignment = idealStateRecord.getMapFields();
- expectedVersion = idealStateRecord.getVersion();
-
- // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply the
- // same target instance state map for these segments to the new ideal
state as the target assignment
- boolean segmentsToMoveChanged = false;
- if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
- // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment for
- // new added segments is based on the existing assignment
- segmentsToMoveChanged = true;
- } else {
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
- // TODO: Consider allowing segment state change from CONSUMING to
ONLINE
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- tableRebalanceLogger.info(
- "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
- + "assignment based on the new IdealState",
- oldInstanceStateMap, currentInstanceStateMap, segment);
- segmentsToMoveChanged = true;
- break;
+ Map<String, Map<String, String>> nextAssignment;
+ boolean needsRecalculation;
+ boolean shouldForceCommit = forceCommitBeforeMoved;
+
+ do {
+ needsRecalculation = false;
+
+ // Step 1: Handle version mismatch and recalculate if needed
+ if (idealStateRecord.getVersion() != expectedVersion) {
+ tableRebalanceLogger.info(
+ "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
+ + "assignment");
+ Map<String, Map<String, String>> oldAssignment = currentAssignment;
+ currentAssignment = idealStateRecord.getMapFields();
+ expectedVersion = idealStateRecord.getVersion();
+
+ // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
+ // the same target instance state map for these segments to the new
ideal state as the target assignment
+ boolean segmentsToMoveChanged = false;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
+ // for new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING
to ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ tableRebalanceLogger.info(
+ "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
+ + "assignment based on the new IdealState",
+ oldInstanceStateMap, currentInstanceStateMap, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
- }
- if (segmentsToMoveChanged) {
- try {
- // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap =
- getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
- tableRebalanceLogger).getLeft();
- tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
- minimizeDataMovement, tableRebalanceLogger).getLeft();
- targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
- tierToInstancePartitionsMap, rebalanceConfig);
- } catch (Exception e) {
- onReturnFailure("Caught exception while re-calculating the target
assignment, aborting the rebalance", e,
- tableRebalanceLogger);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
- "Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ if (segmentsToMoveChanged) {
+ try {
+ // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
+ tableRebalanceLogger).getLeft();
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
+ minimizeDataMovement, tableRebalanceLogger).getLeft();
+ targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
+ tierToInstancePartitionsMap, rebalanceConfig);
+ } catch (Exception e) {
+ onReturnFailure("Caught exception while re-calculating the
target assignment, aborting the rebalance", e,
+ tableRebalanceLogger);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
+ "Caught exception while re-calculating the target
assignment: " + e, instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ }
+ } else {
+ tableRebalanceLogger.info(
+ "No state change found for segments to be moved,
re-calculating the target assignment based on the "
+ + "previous target assignment");
+ Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
+ // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
+ targetAssignment = new TreeMap<>(currentAssignment);
+ for (String segment : segmentsToMove) {
+ targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
}
- } else {
- tableRebalanceLogger.info(
- "No state change found for segments to be moved, re-calculating
the target assignment based on the "
- + "previous target assignment");
- Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
- // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
- targetAssignment = new TreeMap<>(currentAssignment);
- for (String segment : segmentsToMove) {
- targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
+
+ // Step 2: Handle force commit if flag is set, then recalculate if
force commit occurred
+ if (shouldForceCommit) {
+ nextAssignment =
+ getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, tableRebalanceLogger);
+ Set<String> consumingSegmentsToMoveNext =
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+ if (!consumingSegmentsToMoveNext.isEmpty()) {
Review Comment:
We're doing this before each rebalance step, but if I understand correctly,
for non-upsert realtime tables this will actually only happen in the first
rebalance step because in subsequent steps the new consuming segments would
already have been created on the right target servers (and
`consumingSegmentsToMoveNext` would be empty here in subsequent rebalance
steps) based on the instance partitions change, correct?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
+ } else {
+ tableRebalanceLogger.warn(
+ "PinotLLCRealtimeSegmentManager is not initialized, cannot force
commit consuming segments");
+ }
Review Comment:
Unrelated to this PR, but IMO we should either deprecate the CLI
command-based table rebalance or else refactor it to use the controller API to
trigger the table rebalance (and deprecate the ZK-based direct trigger). It
doesn't make sense to maintain these two separate pathways to rebalances and
these differences will only accumulate if we don't consolidate it into the
controller path.
cc - @somandal @Jackie-Jiang
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -151,6 +151,26 @@ public void onTrigger(Trigger trigger, Map<String,
Map<String, String>> currentS
updatedStatsInZk = true;
}
break;
+ case FORCE_COMMIT_BEFORE_MOVED_START_TRIGGER:
+ LOGGER.info("triggered force commit for consuming segments for table:
{}",
Review Comment:
nit: this log seems redundant because we're already logging something
similar in `TableRebalancer`.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
+ if (_pinotLLCRealtimeSegmentManager != null) {
+ ForceCommitBatchConfig forceCommitBatchConfig =
+ ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
+ segmentsToCommit == null ? null : StringUtil.join(",",
segmentsToCommit.toArray(String[]::new)),
+ forceCommitBatchConfig);
+ try {
+ // Wait until all committed segments have their status set to DONE.
+ // Even for pauseless table, we wait until the segment has been
uploaded (status DONE). Because we cannot
+ // guarantee there will be available peers for the new instance to
download (e.g. the only available replica
+ // during the rebalance be the one who's committing, which has
CONSUMING in EV), which may lead to download
+ // timeout and essentially segment ERROR. Furthermore, we need to wait
until EV-IS converge anyway, and that
+ // happens only after the committing segment status is set to DONE.
+
_pinotLLCRealtimeSegmentManager.waitUntilPrevBatchIsComplete(tableNameWithType,
segmentsToCommit,
+ forceCommitBatchConfig);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Failed to wait for previous batch to
complete", e);
+ }
+ } else {
+ tableRebalanceLogger.warn(
+ "PinotLLCRealtimeSegmentManager is not initialized, cannot force
commit consuming segments");
+ }
Review Comment:
From what I can tell, this should only be possible if someone enables the
force commit rebalance config when triggering a rebalance using the
`RebalanceTable` CLI command right? IMO we should add a validation to
`PinotTableRebalancer` and fail if the config is set with a message saying that
it's only supported via the controller API (and also update this path to throw
instead of logging a warning).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -665,6 +665,8 @@ public RebalanceResult rebalance(
@QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier,
+ @ApiParam(value = "Do force commit on consuming segments before they are
rebalanced") @DefaultValue("false")
+ @QueryParam("forceCommitBeforeMoved") boolean forceCommitBeforeMoved,
Review Comment:
I'm not sure `forceCommitBeforeMoved` is the best name for this parameter -
how about simply `forceCommitConsumingSegments` instead? And we could either
keep the current description or enhance it to mention that multiple sets of
consuming segments could be force committed depending on the number of steps in
the rebalance.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1970,4 +2024,50 @@ static class SingleSegmentAssignment {
_availableInstances = availableInstances;
}
}
+
+ @VisibleForTesting
+ static Set<String> getMovingConsumingSegments(Map<String, Map<String,
String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment) {
+ Set<String> movingConsumingSegments = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
+ if (targetInstanceStateMap != null &&
targetInstanceStateMap.values().stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
targetInstanceStateMap.values().stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING))) {
+ if
(!currentInstanceStateMap.keySet().equals(targetInstanceStateMap.keySet())) {
+ movingConsumingSegments.add(segmentName);
+ }
+ }
+ }
+ return movingConsumingSegments;
+ }
+
+ private IdealState forceCommitConsumingSegmentsAndWait(String
tableNameWithType,
+ @Nullable Set<String> segmentsToCommit, Logger tableRebalanceLogger) {
Review Comment:
Why is `segmentsToCommit` marked as `Nullable`? `getMovingConsumingSegments`
can only return an empty set and not `null` right?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -473,6 +492,10 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
// StrictReplicaGroupAssignment::rebalanceTable() and similar limitations
apply here as well
Object2IntOpenHashMap<String> segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
+ boolean isStrictRealtimeSegmentAssignment = (segmentAssignment instanceof
StrictRealtimeSegmentAssignment);
+ PartitionIdFetcher partitionIdFetcher =
+ new PartitionIdFetcherImpl(tableNameWithType,
TableConfigUtils.getPartitionColumn(tableConfig), _helixManager,
+ isStrictRealtimeSegmentAssignment);
// We repeat the following steps until the target assignment is reached:
// 1. Wait for ExternalView to converge with the IdealState. Fail the
rebalance if it doesn't converge within the
// timeout.
Review Comment:
Could we update these comments as well? We should include the force commit
related changes from this PR as well as your previous EV-IS convergence dynamic
timeout change (the first point here still says that rebalance fails if it
doesn't converge within the timeout).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]