somandal commented on code in PR #15891:
URL: https://github.com/apache/pinot/pull/15891#discussion_r2141249400
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +685,52 @@ public SuccessResponse deleteTenant(
@ApiOperation(value = "Rebalances all the tables that are part of the
tenant")
public TenantRebalanceResult rebalance(
@ApiParam(value = "Name of the tenant whose table are to be rebalanced",
required = true)
- @PathParam("tenantName") String tenantName, @ApiParam(required = true)
TenantRebalanceConfig config) {
+ @PathParam("tenantName") String tenantName,
+ @ApiParam(value = "Number of table rebalance jobs allowed to run at the
same time", required = true, example =
+ "1")
+ @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+ @ApiParam(value =
+ "Comma separated list of tables with type that should be included in
this tenant rebalance"
+ + " job. Leaving blank defaults to include all tables from the
tenant. Example: table1_REALTIME, "
+ + "table2_REALTIME",
+ example = "")
+ @QueryParam("includeTables") String allowTables,
+ @ApiParam(value =
+ "Comma separated list of tables with type that would be excluded in
this tenant rebalance"
+ + " job. These tables will be removed from includeTables (that
said, if a table appears in both list, "
+ + "it will be excluded). Example: table1_REALTIME,
table2_REALTIME",
+ example = "")
+ @QueryParam("excludeTables") String blockTables,
+ @ApiParam(value = "Show full rebalance results of each table in the
response", example = "false")
+ @QueryParam("verboseResult") Boolean verboseResult,
+ @ApiParam(name = "rebalanceConfig", value = "The rebalance config
applied to run every table", required = true)
+ TenantRebalanceConfig config) {
// TODO decide on if the tenant rebalance should be database aware or not
config.setTenantName(tenantName);
+ // Query params should override the config provided in the body, if present
+ if (degreeOfParallelism != null) {
+ config.setDegreeOfParallelism(degreeOfParallelism);
+ }
+ if (verboseResult != null) {
+ config.setVerboseResult(verboseResult);
+ }
+ if (allowTables != null) {
+ config.setIncludeTables(Arrays.stream(StringUtil.split(allowTables, ',',
0))
+ .map(s -> s.strip().replaceAll("^\"|\"$", ""))
+ .collect(Collectors.toSet()));
+ }
+ if (blockTables != null) {
+ config.setExcludeTables(Arrays.stream(StringUtil.split(blockTables, ',',
0))
+ .map(s -> s.strip().replaceAll("^\"|\"$", ""))
+ .collect(Collectors.toSet()));
+ }
+ if ((!config.getExcludeTables().isEmpty() ||
!config.getIncludeTables().isEmpty()) && (
+ !config.getParallelBlacklist().isEmpty() ||
!config.getParallelWhitelist().isEmpty())) {
+ throw new ControllerApplicationException(LOGGER,
+ "Bad usage by specifying both include/excludeTables and
parallelWhitelist/Blacklist at the same time."
+ + " The latter is a deprecated usage of this API.",
Review Comment:
Might be cleaner to have 2 booleans and check if only one of them is true:
```
boolean isParallelListSet = !config.getParallelBlacklist().isEmpty() ||
!config.getParallelWhitelist().isEmpty();
boolean isIncludeExcludeListSet = !config.getExcludeTables().isEmpty() ||
!config.getIncludeTables().isEmpty();
if (isParallelListSet && isIncludeExcludeListSet) {
...
}
```
until it is actually deprecated, can we not add this comment? or you should
mark it as deprecated in this PR itself?
Instead perhaps say something like the config combinations are not
supported? Either use one or the other
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +685,52 @@ public SuccessResponse deleteTenant(
@ApiOperation(value = "Rebalances all the tables that are part of the
tenant")
public TenantRebalanceResult rebalance(
@ApiParam(value = "Name of the tenant whose table are to be rebalanced",
required = true)
- @PathParam("tenantName") String tenantName, @ApiParam(required = true)
TenantRebalanceConfig config) {
+ @PathParam("tenantName") String tenantName,
+ @ApiParam(value = "Number of table rebalance jobs allowed to run at the
same time", required = true, example =
+ "1")
+ @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+ @ApiParam(value =
+ "Comma separated list of tables with type that should be included in
this tenant rebalance"
+ + " job. Leaving blank defaults to include all tables from the
tenant. Example: table1_REALTIME, "
+ + "table2_REALTIME",
+ example = "")
+ @QueryParam("includeTables") String allowTables,
+ @ApiParam(value =
+ "Comma separated list of tables with type that would be excluded in
this tenant rebalance"
+ + " job. These tables will be removed from includeTables (that
said, if a table appears in both list, "
+ + "it will be excluded). Example: table1_REALTIME,
table2_REALTIME",
+ example = "")
+ @QueryParam("excludeTables") String blockTables,
+ @ApiParam(value = "Show full rebalance results of each table in the
response", example = "false")
+ @QueryParam("verboseResult") Boolean verboseResult,
+ @ApiParam(name = "rebalanceConfig", value = "The rebalance config
applied to run every table", required = true)
+ TenantRebalanceConfig config) {
// TODO decide on if the tenant rebalance should be database aware or not
config.setTenantName(tenantName);
+ // Query params should override the config provided in the body, if present
+ if (degreeOfParallelism != null) {
+ config.setDegreeOfParallelism(degreeOfParallelism);
+ }
+ if (verboseResult != null) {
+ config.setVerboseResult(verboseResult);
+ }
+ if (allowTables != null) {
+ config.setIncludeTables(Arrays.stream(StringUtil.split(allowTables, ',',
0))
+ .map(s -> s.strip().replaceAll("^\"|\"$", ""))
+ .collect(Collectors.toSet()));
+ }
+ if (blockTables != null) {
+ config.setExcludeTables(Arrays.stream(StringUtil.split(blockTables, ',',
0))
+ .map(s -> s.strip().replaceAll("^\"|\"$", ""))
+ .collect(Collectors.toSet()));
+ }
+ if ((!config.getExcludeTables().isEmpty() ||
!config.getIncludeTables().isEmpty()) && (
+ !config.getParallelBlacklist().isEmpty() ||
!config.getParallelWhitelist().isEmpty())) {
+ throw new ControllerApplicationException(LOGGER,
+ "Bad usage by specifying both include/excludeTables and
parallelWhitelist/Blacklist at the same time."
+ + " The latter is a deprecated usage of this API.",
+ Response.Status.BAD_REQUEST);
Review Comment:
can you add an API responses similar to other APIs if it makes sense?
e.g from above:
```
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success", response =
InstancePartitions.class),
@ApiResponse(code = 400, message = "Failed to deserialize/validate the
instance partitions"),
@ApiResponse(code = 500, message = "Error updating the tenant")
}
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TenantRebalanceIntegrationTest.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.nio.charset.StandardCharsets;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+public class TenantRebalanceIntegrationTest extends
BaseHybridClusterIntegrationTest {
+
+ private String getRebalanceUrl() {
+ return StringUtil.join("/", getControllerRequestURLBuilder().getBaseUrl(),
"tenants", getServerTenant(),
+ "rebalance");
+ }
+
+ @Test
+ public void testDeprecatedParallelWhitelistBlacklistCompatibility()
Review Comment:
nit: let's remove the word deprecated from the PR until you deprecate things
to avoid confusion
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -54,20 +61,107 @@ public DefaultTenantRebalancer(TableRebalanceManager
tableRebalanceManager,
@Override
public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ if (!config.getParallelWhitelist().isEmpty() ||
!config.getParallelBlacklist().isEmpty()) {
+ // If the parallel whitelist or blacklist is set, the old tenant
rebalance logic will be used
+ // TODO: Deprecate the support for this in the future
+ LOGGER.warn("Using the old tenant rebalance logic because parallel
whitelist or blacklist is set, "
+ + "which is a deprecated usage of this API.");
Review Comment:
nit: again let's call it as incompatible with new configs rather than
deprecated. you can comment about deprecation once you actually deprecate it
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -54,20 +61,107 @@ public DefaultTenantRebalancer(TableRebalanceManager
tableRebalanceManager,
@Override
public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ if (!config.getParallelWhitelist().isEmpty() ||
!config.getParallelBlacklist().isEmpty()) {
+ // If the parallel whitelist or blacklist is set, the old tenant
rebalance logic will be used
+ // TODO: Deprecate the support for this in the future
+ LOGGER.warn("Using the old tenant rebalance logic because parallel
whitelist or blacklist is set, "
+ + "which is a deprecated usage of this API.");
+ return rebalanceWithParallelAndSequential(config);
+ }
+ return rebalanceWithIncludeExcludeTables(config);
+ }
+
+ private TenantRebalanceResult
rebalanceWithIncludeExcludeTables(TenantRebalanceConfig config) {
+ Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+ Set<String> tables = getTenantTables(config.getTenantName());
+ Set<String> includeTables = config.getIncludeTables();
+ if (!includeTables.isEmpty()) {
+ tables.retainAll(includeTables);
+ }
+ tables.removeAll(config.getExcludeTables());
+ tables.forEach(table -> {
+ try {
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(true);
+ dryRunResults.put(table,
+ _tableRebalanceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(), false));
+ } catch (TableNotFoundException | RebalanceInProgressException
exception) {
+ dryRunResults.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
+ null, null, null, null, null));
+ }
+ });
+
+ if (config.isDryRun()) {
+ return new TenantRebalanceResult(null, dryRunResults,
config.isVerboseResult());
+ }
+
+ String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+ TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+ tables, _pinotHelixResourceManager);
+ observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
+ ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config,
dryRunResults);
+ // ensure atleast 1 thread is created to run the sequential table
rebalance operations
+ int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+ try {
+ for (int i = 0; i < parallelism; i++) {
+ _executorService.submit(() -> {
+ while (true) {
+ String table = parallelQueue.poll();
+ if (table == null) {
+ break;
+ }
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
+ if (dryRunResults.get(table)
+ .getRebalanceSummaryResult()
+ .getSegmentInfo()
+ .getReplicationFactor()
+ .getExpectedValueAfterRebalance() == 1) {
+ rebalanceConfig.setMinAvailableReplicas(0);
Review Comment:
Can you add a TODO here to address the concern about someone setting
`downtime=true` and the need to wait for that? That way we don't lose track of
this in the code
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -28,6 +30,14 @@ public class TenantRebalanceResult {
private String _jobId;
private Map<String, RebalanceResult> _rebalanceTableResults;
+ @JsonCreator
+ public TenantRebalanceResult(
+ @JsonProperty("jobId") String jobId,
+ @JsonProperty("rebalanceTableResults") Map<String, RebalanceResult>
rebalanceTableResults) {
+ _jobId = jobId;
+ _rebalanceTableResults = rebalanceTableResults;
Review Comment:
can you have this call the other constructor instead of setting these?
```
this(jobId, rebalanceTableResults, true);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -54,20 +61,107 @@ public DefaultTenantRebalancer(TableRebalanceManager
tableRebalanceManager,
@Override
public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ if (!config.getParallelWhitelist().isEmpty() ||
!config.getParallelBlacklist().isEmpty()) {
+ // If the parallel whitelist or blacklist is set, the old tenant
rebalance logic will be used
+ // TODO: Deprecate the support for this in the future
+ LOGGER.warn("Using the old tenant rebalance logic because parallel
whitelist or blacklist is set, "
+ + "which is a deprecated usage of this API.");
+ return rebalanceWithParallelAndSequential(config);
+ }
+ return rebalanceWithIncludeExcludeTables(config);
+ }
+
+ private TenantRebalanceResult
rebalanceWithIncludeExcludeTables(TenantRebalanceConfig config) {
+ Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+ Set<String> tables = getTenantTables(config.getTenantName());
+ Set<String> includeTables = config.getIncludeTables();
+ if (!includeTables.isEmpty()) {
+ tables.retainAll(includeTables);
+ }
+ tables.removeAll(config.getExcludeTables());
+ tables.forEach(table -> {
+ try {
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(true);
+ dryRunResults.put(table,
+ _tableRebalanceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(), false));
+ } catch (TableNotFoundException | RebalanceInProgressException
exception) {
+ dryRunResults.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
+ null, null, null, null, null));
+ }
+ });
+
+ if (config.isDryRun()) {
+ return new TenantRebalanceResult(null, dryRunResults,
config.isVerboseResult());
+ }
+
+ String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+ TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+ tables, _pinotHelixResourceManager);
+ observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
+ ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config,
dryRunResults);
+ // ensure atleast 1 thread is created to run the sequential table
rebalance operations
+ int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+ try {
+ for (int i = 0; i < parallelism; i++) {
+ _executorService.submit(() -> {
+ while (true) {
+ String table = parallelQueue.poll();
+ if (table == null) {
+ break;
+ }
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
+ if (dryRunResults.get(table)
+ .getRebalanceSummaryResult()
+ .getSegmentInfo()
+ .getReplicationFactor()
+ .getExpectedValueAfterRebalance() == 1) {
+ rebalanceConfig.setMinAvailableReplicas(0);
+ }
+ rebalanceTable(table, rebalanceConfig,
dryRunResults.get(table).getJobId(), observer);
+ }
+ observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", config.getTenantName()));
+ });
+ }
+ } catch (Exception exception) {
+ observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", config.getTenantName(),
+ exception.getMessage()));
+ }
+
+ // Prepare tenant rebalance result to return
+ Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
+ for (String table : dryRunResults.keySet()) {
+ RebalanceResult result = dryRunResults.get(table);
+ if (result.getStatus() == RebalanceResult.Status.DONE) {
+ rebalanceResults.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
+ "In progress, check controller task status for the",
result.getInstanceAssignment(),
+ result.getTierInstanceAssignment(), result.getSegmentAssignment(),
result.getPreChecksResult(),
+ result.getRebalanceSummaryResult()));
+ } else {
+ rebalanceResults.put(table, dryRunResults.get(table));
+ }
+ }
+ return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults,
config.isVerboseResult());
+ }
+
+ // This method implements the old logic for tenant rebalance using parallel
whitelist/blacklist.
+ // Usage of this method is now deprecated and will be removed in the future.
Review Comment:
nit: let's just wait to add this part of the comment when you actually
deprecate it. otherwise it gets confusing to follow since it isn't actually
marked as deprecated yet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]