yashmayya commented on code in PR #15990:
URL: https://github.com/apache/pinot/pull/15990#discussion_r2136901895
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -783,29 +781,7 @@ public List<String> cancelRebalance(
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
- List<String> cancelledJobIds = new ArrayList<>();
- boolean updated =
- _pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
- jobMetadata -> {
- String jobId =
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
- try {
- String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
- TableRebalanceProgressStats jobStats =
- JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
- if (jobStats.getStatus() !=
RebalanceResult.Status.IN_PROGRESS) {
- return;
- }
- cancelledJobIds.add(jobId);
- LOGGER.info("Cancel rebalance job: {} for table: {}", jobId,
tableNameWithType);
- jobStats.setStatus(RebalanceResult.Status.CANCELLED);
-
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(jobStats));
- } catch (Exception e) {
- LOGGER.error("Failed to cancel rebalance job: {} for table:
{}", jobId, tableNameWithType, e);
- }
- });
- LOGGER.info("Tried to cancel existing jobs at best effort and done: {}",
updated);
- return cancelledJobIds;
+ return _tableRebalanceManager.cancelRebalance(tableNameWithType);
}
@GET
Review Comment:
Yeah, that makes sense, we should try to centralize all the rebalance logic
to the manager class eventually. I've moved the status retrieval method over as
well.
##########
pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java:
##########
@@ -55,7 +57,12 @@ public PinotTableRebalancer(String zkAddress, String
clusterName, boolean dryRun
public RebalanceResult rebalance(String tableNameWithType) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: " + tableNameWithType);
- return new TableRebalancer(_helixManager).rebalance(tableConfig,
_rebalanceConfig,
- TableRebalancer.createUniqueRebalanceJobIdentifier());
+
+ String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+ ZkBasedTableRebalanceObserver rebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, jobId,
+ TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig),
_propertyStore);
+
+ return new TableRebalancer(_helixManager, rebalanceObserver, null, null,
null)
Review Comment:
Yeah, that's right, but there's no technical reason we can't enforce the
check here as well since we do have access to ZK. I've refactored some code to
add the check here as well, thanks for calling this out!
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1258,6 +1265,41 @@ private void checkRebalanceDryRunSummary(RebalanceResult
rebalanceResult, Rebala
assertEquals(numServersUnchanged,
summaryResult.getServerInfo().getServersUnchanged().size());
}
+ @Test
+ public void testDisallowMultipleConcurrentRebalancesOnSameTable() throws
Exception {
+ // Manually write an IN_PROGRESS rebalance job to ZK instead of trying to
collide multiple actual rebalance
+ // attempts which will be prone to race conditions and cause this test to
be flaky. We only reject a rebalance job
+ // if there is an IN_PROGRESS rebalance job for the same table in ZK, so
we could actually end up with more than
+ // one active rebalance job if both are started at the exact same time
since the progress stats are written to ZK
+ // after some initial pre-checks are done. However, rebalances are
idempotent, and we don't actually care too much
+ // about avoiding this edge case race condition as long as in most cases
we are able to prevent users from
+ // triggering a rebalance for a table that already has an in-progress
rebalance job.
+ String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+ String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(getTableName());
+ TableRebalanceProgressStats progressStats = new
TableRebalanceProgressStats();
+ progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+ Map<String, String> jobMetadata = new HashMap<>();
+ jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(progressStats));
+ ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+ prevJobMetadata -> true);
+
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ assertTrue(response.contains("Rebalance job is already in progress for
table"));
Review Comment:
We don't currently set the status code based on the rebalance result so as
long as there isn't a thrown exception, the returned status code would be
`200`. The only separately handled case is `TableNotFoundException` which is
caught and handled as a `404`. I've made some changes to treat the rebalance in
progress case similarly and handle it as a `409 CONFLICT`.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+ private final PinotHelixResourceManager _resourceManager;
+ private final ControllerMetrics _controllerMetrics;
+ private final RebalancePreChecker _rebalancePreChecker;
+ private final TableSizeReader _tableSizeReader;
+ private final ExecutorService _executorService;
+
+ public TableRebalanceManager(PinotHelixResourceManager resourceManager,
ControllerMetrics controllerMetrics,
+ RebalancePreChecker rebalancePreChecker, TableSizeReader
tableSizeReader, ExecutorService executorService) {
+ _resourceManager = resourceManager;
+ _controllerMetrics = controllerMetrics;
+ _rebalancePreChecker = rebalancePreChecker;
+ _tableSizeReader = tableSizeReader;
+ _executorService = executorService;
+ }
+
+ /**
+ * Rebalance the table with the given name and type synchronously. It's the
responsibility of the caller to ensure
+ * that this rebalance is run on the rebalance thread pool in the controller
that respects the configuration
+ * {@link
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+ *
+ * @param tableNameWithType name of the table to rebalance
+ * @param rebalanceConfig configuration for the rebalance operation
+ * @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
+ * @param trackRebalanceProgress whether to track rebalance progress stats
in ZK
+ * @return result of the rebalance operation
+ * @throws TableNotFoundException if the table does not exist
+ */
+ public RebalanceResult rebalanceTable(String tableNameWithType,
RebalanceConfig rebalanceConfig,
+ String rebalanceJobId, boolean trackRebalanceProgress)
+ throws TableNotFoundException {
+ TableConfig tableConfig =
_resourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
+ }
+ Preconditions.checkState(rebalanceJobId != null, "RebalanceId not
populated in the rebalanceConfig");
+ ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+ if (trackRebalanceProgress) {
+ zkBasedTableRebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+ TableRebalanceContext.forInitialAttempt(rebalanceJobId,
rebalanceConfig),
+ _resourceManager.getPropertyStore());
+ }
+ return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId,
rebalanceConfig,
+ zkBasedTableRebalanceObserver);
+ }
+
+ /**
+ * Rebalance the table with the given name and type asynchronously. The
number of concurrent rebalances permitted
+ * on this controller is configured by
+ * {@link
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+ *
+ * @param tableNameWithType name of the table to rebalance
+ * @param rebalanceConfig configuration for the rebalance operation
+ * @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
+ * @param trackRebalanceProgress whether to track rebalance progress stats
in ZK
+ * @return a CompletableFuture that will complete with the result of the
rebalance operation
+ * @throws TableNotFoundException if the table does not exist
+ */
+ public CompletableFuture<RebalanceResult> rebalanceTableAsync(String
tableNameWithType,
+ RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean
trackRebalanceProgress)
+ throws TableNotFoundException {
+ TableConfig tableConfig =
_resourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
+ }
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return rebalanceTable(tableNameWithType, rebalanceConfig,
rebalanceJobId, trackRebalanceProgress);
+ } catch (TableNotFoundException e) {
+ // Should not happen since we already checked for table existence
+ throw new RuntimeException(e);
+ }
+ },
+ _executorService);
+ }
+
+ /**
+ * Rebalance the table with the given name and type asynchronously. The
number of concurrent rebalances permitted
+ * on this controller is configured by
+ * {@link
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+ *
+ * @param tableNameWithType name of the table to rebalance
+ * @param tableConfig configuration for the table to rebalance
+ * @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
+ * @param rebalanceConfig configuration for the rebalance operation
+ * @param zkBasedTableRebalanceObserver observer to track rebalance progress
in ZK
+ * @return a CompletableFuture that will complete with the result of the
rebalance operation
+ */
+ public CompletableFuture<RebalanceResult> rebalanceTableAsync(String
tableNameWithType, TableConfig tableConfig,
+ String rebalanceJobId, RebalanceConfig rebalanceConfig,
+ @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) {
+ return CompletableFuture.supplyAsync(
+ () -> rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId,
rebalanceConfig,
+ zkBasedTableRebalanceObserver),
+ _executorService);
+ }
+
+ @VisibleForTesting
+ RebalanceResult rebalanceTable(String tableNameWithType, TableConfig
tableConfig, String rebalanceJobId,
+ RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver
zkBasedTableRebalanceObserver) {
+ String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType);
+ if (rebalanceJobInProgress != null) {
+ String errorMsg = "Rebalance job is already in progress for table: " +
tableNameWithType + ", jobId: "
+ + rebalanceJobInProgress + ". Please wait for the job to complete or
cancel it before starting a new one.";
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null, null);
+ }
+
+ Map<String, Set<String>> tierToSegmentsMap;
+ if (rebalanceConfig.isUpdateTargetTier()) {
+ tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId,
tableNameWithType, tableConfig);
+ } else {
+ tierToSegmentsMap = null;
+ }
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_resourceManager.getHelixZkManager(),
zkBasedTableRebalanceObserver, _controllerMetrics,
+ _rebalancePreChecker, _tableSizeReader);
+
+ return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId, tierToSegmentsMap);
+ }
+
+ /**
+ * Cancels ongoing rebalance jobs (if any) for the given table.
+ *
+ * @param tableNameWithType name of the table for which to cancel any
ongoing rebalance job
+ * @return the list of job IDs that were cancelled
+ */
+ public List<String> cancelRebalance(String tableNameWithType) {
+ List<String> cancelledJobIds = new ArrayList<>();
+ _resourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
+ jobMetadata -> {
+ String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+ try {
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats jobStats =
+ JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
+ if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+
+ LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId,
tableNameWithType);
+ jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(jobStats));
+ cancelledJobIds.add(jobId);
+ } catch (Exception e) {
+ LOGGER.error("Failed to cancel rebalance job: {} for table: {}",
jobId, tableNameWithType, e);
+ }
+ });
+ return cancelledJobIds;
+ }
+
+ /**
+ * Checks if there is an ongoing rebalance job for the given table.
+ *
+ * @param tableNameWithType name of the table to check for ongoing rebalance
jobs
+ * @return jobId of the ongoing rebalance job if one exists, {@code null}
otherwise
+ */
+ @Nullable
+ private String rebalanceJobInProgress(String tableNameWithType) {
Review Comment:
Good catch, it was actually an oversight on my part and not intentional. IMO
dry run rebalance requests should always be allowed. I'd made sure that all dry
run requests go through the synchronous path and not the async one on our
rebalance thread pool to make sure that they're never put into the waiting
queue. But I missed that we'd be preventing dry run rebalance requests as well
when there's an actual ongoing rebalance for the table. I've updated this now
without the additional field in `RebalanceResult` (agree that this could be a
future improvement though).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+ private final PinotHelixResourceManager _resourceManager;
+ private final ControllerMetrics _controllerMetrics;
+ private final RebalancePreChecker _rebalancePreChecker;
+ private final TableSizeReader _tableSizeReader;
+ private final ExecutorService _executorService;
+
+ public TableRebalanceManager(PinotHelixResourceManager resourceManager,
ControllerMetrics controllerMetrics,
+ RebalancePreChecker rebalancePreChecker, TableSizeReader
tableSizeReader, ExecutorService executorService) {
+ _resourceManager = resourceManager;
+ _controllerMetrics = controllerMetrics;
+ _rebalancePreChecker = rebalancePreChecker;
+ _tableSizeReader = tableSizeReader;
+ _executorService = executorService;
+ }
+
+ /**
+ * Rebalance the table with the given name and type synchronously. It's the
responsibility of the caller to ensure
Review Comment:
This is the synchronous table rebalance API which will be performed on the
caller's thread. The async APIs will perform the rebalance on the rebalance
executor thread pool which is configured based on the controller config. These
sync APIs are present for use cases like dry run rebalance requests, segment
relocator initiated rebalances (done on the separate controller thread pool
like you pointed out in another comment), and tenant rebalances (which use the
same rebalance thread pool internally, and this API allows for more flexible
coupling).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]