This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 44dd108be8b Add a rebalance option to disable the summary (#16615)
44dd108be8b is described below
commit 44dd108be8b2074949803aecf8cb92a3edeb50c0
Author: Sonam Mandal <[email protected]>
AuthorDate: Sat Aug 16 18:39:30 2025 -0700
Add a rebalance option to disable the summary (#16615)
* Add a rebalance option to disable the summary
* Address review comments
---
.../api/resources/PinotTableRestletResource.java | 3 ++
.../core/rebalance/DefaultRebalancePreChecker.java | 25 ++++++++-----
.../helix/core/rebalance/RebalanceConfig.java | 27 ++++++++++----
.../helix/core/rebalance/TableRebalancer.java | 42 +++++++++++++++-------
.../TableRebalancerClusterStatelessTest.java | 25 +++++++++++++
5 files changed, 95 insertions(+), 27 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 1a60c39bd74..898fd26e7b0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -704,6 +704,8 @@ public class PinotTableRestletResource {
boolean dryRun,
@ApiParam(value = "Whether to enable pre-checks for table, must be in
dry-run mode to enable")
@DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
+ @ApiParam(value = "Whether to disable summary calculation")
+ @DefaultValue("false") @QueryParam("disableSummary") boolean
disableSummary,
@ApiParam(value = "Whether to reassign instances before reassigning
segments") @DefaultValue("true")
@QueryParam("reassignInstances") boolean reassignInstances,
@ApiParam(value = "Whether to reassign CONSUMING segments for real-time
table") @DefaultValue("true")
@@ -781,6 +783,7 @@ public class PinotTableRestletResource {
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
rebalanceConfig.setPreChecks(preChecks);
+ rebalanceConfig.setDisableSummary(disableSummary);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index fe782f4bd3d..7b67774c16c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -412,16 +412,23 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
}
// --- Batch size per server recommendation check using summary ---
- int maxSegmentsToAddOnServer =
rebalanceSummaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer();
- int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
- if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) {
- if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER
- || batchSizePerServer > RECOMMENDED_BATCH_SIZE) {
- pass = false;
- warnings.add("Number of segments to add to a single server (" +
maxSegmentsToAddOnServer + ") is high (>"
- + SEGMENT_ADD_THRESHOLD + "). It is recommended to set
batchSizePerServer to " + RECOMMENDED_BATCH_SIZE
- + " or lower to avoid excessive load on servers.");
+ if (rebalanceSummaryResult != null) {
+ int maxSegmentsToAddOnServer =
rebalanceSummaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer();
+ int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
+ if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) {
+ if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER
+ || batchSizePerServer > RECOMMENDED_BATCH_SIZE) {
+ pass = false;
+ warnings.add("Number of segments to add to a single server (" +
maxSegmentsToAddOnServer + ") is high (>"
+ + SEGMENT_ADD_THRESHOLD + "). It is recommended to set
batchSizePerServer to " + RECOMMENDED_BATCH_SIZE
+ + " or lower to avoid excessive load on servers.");
+ }
}
+ } else {
+ // Rebalance summary should not be null when pre-checks are enabled
unless an exception was thrown while
+ // calculating it
+ pass = false;
+ warnings.add("Could not assess batchSizePerServer recommendation as
rebalance summary could not be calculated");
}
return pass ? RebalancePreCheckerResult.pass("All rebalance parameters
look good")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index de6cdb7ea07..bfb7865d48e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -39,11 +39,16 @@ public class RebalanceConfig {
private boolean _dryRun = false;
// Whether to perform pre-checks for rebalance. This only returns the status
of each pre-check and does not fail
- // rebalance
+ // rebalance. Summary is required to calculate pre-checks, so if
'disableSummary=true', it will be reset to false
@JsonProperty("preChecks")
@ApiModelProperty(example = "false")
private boolean _preChecks = false;
+ // Whether to disable the summary or not. If set to true the summary will
not be calculated
+ @JsonProperty("disableSummary")
+ @ApiModelProperty(example = "false")
+ private boolean _disableSummary = false;
+
// Whether to reassign instances before reassigning segments
@JsonProperty("reassignInstances")
@ApiModelProperty(example = "false")
@@ -186,6 +191,14 @@ public class RebalanceConfig {
_preChecks = preChecks;
}
+ public boolean isDisableSummary() {
+ return _disableSummary;
+ }
+
+ public void setDisableSummary(boolean disableSummary) {
+ _disableSummary = disableSummary;
+ }
+
public boolean isReassignInstances() {
return _reassignInstances;
}
@@ -366,9 +379,9 @@ public class RebalanceConfig {
@Override
public String toString() {
- return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" +
_preChecks + ", _reassignInstances="
- + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ",
_minimizeDataMovement="
- + _minimizeDataMovement + ", _bootstrap=" + _bootstrap + ",
_downtime=" + _downtime
+ return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" +
_preChecks + ", _disableSummary="
+ + _disableSummary + ", _reassignInstances=" + _reassignInstances + ",
_includeConsuming=" + _includeConsuming
+ + ", _minimizeDataMovement=" + _minimizeDataMovement + ", _bootstrap="
+ _bootstrap + ", _downtime=" + _downtime
+ ", _allowPeerDownloadDataLoss=" + _allowPeerDownloadDataLoss + ",
_minAvailableReplicas="
+ _minAvailableReplicas + ", _bestEfforts=" + _bestEfforts + ",
batchSizePerServer="
+ _batchSizePerServer + ", _externalViewCheckIntervalInMs=" +
_externalViewCheckIntervalInMs
@@ -382,8 +395,9 @@ public class RebalanceConfig {
}
public String toQueryString() {
- return "dryRun=" + _dryRun + "&preChecks=" + _preChecks +
"&reassignInstances=" + _reassignInstances
- + "&includeConsuming=" + _includeConsuming + "&bootstrap=" +
_bootstrap + "&downtime=" + _downtime
+ return "dryRun=" + _dryRun + "&preChecks=" + _preChecks +
"&disableSummary=" + _disableSummary
+ + "&reassignInstances=" + _reassignInstances + "&includeConsuming=" +
_includeConsuming
+ + "&bootstrap=" + _bootstrap + "&downtime=" + _downtime
+ "&allowPeerDownloadDataLoss=" + _allowPeerDownloadDataLoss +
"&minAvailableReplicas=" + _minAvailableReplicas
+ "&bestEfforts=" + _bestEfforts + "&minimizeDataMovement=" +
_minimizeDataMovement.name()
+ "&batchSizePerServer=" + _batchSizePerServer
@@ -402,6 +416,7 @@ public class RebalanceConfig {
RebalanceConfig rc = new RebalanceConfig();
rc._dryRun = cfg._dryRun;
rc._preChecks = cfg._preChecks;
+ rc._disableSummary = cfg._disableSummary;
rc._reassignInstances = cfg._reassignInstances;
rc._includeConsuming = cfg._includeConsuming;
rc._bootstrap = cfg._bootstrap;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 014327e68a9..2edd6042025 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -221,6 +221,7 @@ public class TableRebalancer {
}
boolean dryRun = rebalanceConfig.isDryRun();
boolean preChecks = rebalanceConfig.isPreChecks();
+ boolean disableSummary = rebalanceConfig.isDisableSummary();
boolean reassignInstances = rebalanceConfig.isReassignInstances();
boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
boolean bootstrap = rebalanceConfig.isBootstrap();
@@ -244,15 +245,15 @@ public class TableRebalancer {
forceCommit = false;
}
tableRebalanceLogger.info(
- "Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances:
{}, "
+ "Start rebalancing with dryRun: {}, preChecks: {}, disableSummary: {},
reassignInstances: {}, "
+ "includeConsuming: {}, bootstrap: {}, downtime: {},
allowPeerDownloadDataLoss: {}, "
+ "minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup:
{}, lowDiskMode: {}, bestEfforts: {}, "
+ "batchSizePerServer: {}, externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}, "
+ "minimizeDataMovement: {}, forceCommit: {},
forceCommitBatchSize: {}, "
+ "forceCommitBatchStatusCheckIntervalMs: {},
forceCommitBatchStatusCheckTimeoutMs: {}",
- dryRun, preChecks, reassignInstances, includeConsuming, bootstrap,
downtime, allowPeerDownloadDataLoss,
- minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
lowDiskMode, bestEfforts, batchSizePerServer,
- externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs,
minimizeDataMovement,
+ dryRun, preChecks, disableSummary, reassignInstances,
includeConsuming, bootstrap, downtime,
+ allowPeerDownloadDataLoss, minReplicasToKeepUpForNoDowntime,
enableStrictReplicaGroup, lowDiskMode, bestEfforts,
+ batchSizePerServer, externalViewCheckIntervalInMs,
externalViewStabilizationTimeoutInMs, minimizeDataMovement,
forceCommit, rebalanceConfig.getForceCommitBatchSize(),
rebalanceConfig.getForceCommitBatchStatusCheckIntervalMs(),
rebalanceConfig.getForceCommitBatchStatusCheckTimeoutMs());
@@ -265,6 +266,12 @@ public class TableRebalancer {
null);
}
+ if (preChecks && disableSummary) {
+ tableRebalanceLogger.warn("disableSummary must be set to false to enable
preChecks, but was set to true. "
+ + "Setting to false, as summary calculation is needed for
preChecks");
+ disableSummary = false;
+ }
+
// Fetch ideal state
PropertyKey idealStatePropertyKey =
_helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
IdealState currentIdealState;
@@ -355,18 +362,29 @@ public class TableRebalancer {
// Calculate summary here itself so that even if the table is already
balanced, the caller can verify whether that
// is expected or not based on the summary results
- RebalanceSummaryResult summaryResult =
- calculateDryRunSummary(currentAssignment, targetAssignment,
tableNameWithType, tableSubTypeSizeDetails,
- tableConfig, tableRebalanceLogger);
+ RebalanceSummaryResult summaryResult = null;
+ if (!disableSummary) {
+ try {
+ summaryResult = calculateRebalanceSummary(currentAssignment,
targetAssignment, tableNameWithType,
+ tableSubTypeSizeDetails, tableConfig, tableRebalanceLogger);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Caught exception while trying to calculate
the rebalance summary, skipping summary "
+ + "calculation", e);
+ }
+ }
if (preChecks) {
if (_rebalancePreChecker == null) {
tableRebalanceLogger.warn("Pre-checks are enabled but the pre-checker
is not set, skipping pre-checks");
} else {
- RebalancePreChecker.PreCheckContext preCheckContext =
- new RebalancePreChecker.PreCheckContext(rebalanceJobId,
tableNameWithType, tableConfig, currentAssignment,
- targetAssignment, tableSubTypeSizeDetails, rebalanceConfig,
summaryResult);
- preChecksResult = _rebalancePreChecker.check(preCheckContext);
+ try {
+ RebalancePreChecker.PreCheckContext preCheckContext =
+ new RebalancePreChecker.PreCheckContext(rebalanceJobId,
tableNameWithType, tableConfig, currentAssignment,
+ targetAssignment, tableSubTypeSizeDetails, rebalanceConfig,
summaryResult);
+ preChecksResult = _rebalancePreChecker.check(preCheckContext);
+ } catch (Exception e) {
+ tableRebalanceLogger.warn("Caught exception while trying to run the
rebalance pre-checks, skipping", e);
+ }
}
}
@@ -802,7 +820,7 @@ public class TableRebalancer {
return tableSizeDetails == null ? -1 :
tableSizeDetails._reportedSizePerReplicaInBytes;
}
- private RebalanceSummaryResult calculateDryRunSummary(Map<String,
Map<String, String>> currentAssignment,
+ private RebalanceSummaryResult calculateRebalanceSummary(Map<String,
Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, String
tableNameWithType,
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails,
TableConfig tableConfig,
Logger tableRebalanceLogger) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 599ec4a2801..ffb2b6e518e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -657,6 +657,31 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertNull(rebalanceResult.getRebalanceSummaryResult());
assertNull(rebalanceResult.getPreChecksResult());
+ // Try pre-checks mode with disableSummary set - this should work and
summary should still be returned
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setPreChecks(true);
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setDisableSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertNotNull(rebalanceResult.getRebalanceSummaryResult());
+ assertNotNull(rebalanceResult.getPreChecksResult());
+
+ // Try dry-run mode with disableSummary set - this should work and
summary should not be returned
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setDisableSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+ // Try rebalance with disableSummary set - this should work and summary
should not be returned
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDisableSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
for (int i = 0; i < numServers; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]