This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d6b1b4feba Tenant rebalance and status tracking APIs (#11128)
d6b1b4feba is described below
commit d6b1b4feba1f9a2168f5119e1b673d7cfcf8d146
Author: Shounak kulkarni <[email protected]>
AuthorDate: Sat Aug 12 02:23:10 2023 +0530
Tenant rebalance and status tracking APIs (#11128)
* endpoint to rebalance tables of a tenant
* move tenant rebalance logic to a separate TenantRebalancer service
* implement extractRebalanceConfig
* function rename
* observability
* status reporting
* moved tenant rebalance related classes to a separate package
* ignore null values in response on RebalanceResult
* bugfix, initiate the _remainingTables field
* integration test for tenant rebalance
* break while loop gracefully
* reuse the last active parallel thread to run the sequential jobs
* metadata npe handling
* maintain separate executor service for tenant rebalance operations
* prioritise dimension tables
* var name fix
* todo comment added
* Add Authorize annotations to APIs
---
.../metadata/controllerjob/ControllerJobType.java | 2 +-
.../pinot/controller/BaseControllerStarter.java | 10 +
.../api/resources/PinotTenantRestletResource.java | 55 +++++
.../TenantRebalanceJobStatusResponse.java | 43 ++++
.../helix/core/rebalance/RebalanceContext.java | 138 ++++++++++++
.../helix/core/rebalance/RebalanceResult.java | 5 +
.../rebalance/tenant/DefaultTenantRebalancer.java | 234 +++++++++++++++++++++
.../rebalance/tenant/TenantRebalanceContext.java | 81 +++++++
.../rebalance/tenant/TenantRebalanceObserver.java | 21 +-
.../tenant/TenantRebalanceProgressStats.java | 114 ++++++++++
.../rebalance/tenant/TenantRebalanceResult.java | 59 ++++++
.../core/rebalance/tenant/TenantRebalancer.java | 7 +-
.../tenant/ZkBasedTenantRebalanceObserver.java | 116 ++++++++++
.../rebalance/tenant/TenantRebalancerTest.java | 191 +++++++++++++++++
.../java/org/apache/pinot/core/auth/Actions.java | 1 +
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
16 files changed, 1071 insertions(+), 7 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
index aaad454787..025d7b6c39 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
@@ -19,5 +19,5 @@
package org.apache.pinot.common.metadata.controllerjob;
public enum ControllerJobType {
- RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+ RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 594ca27bfd..610c0a01f0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -85,6 +85,8 @@ import
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
import
org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
@@ -173,6 +175,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected StaleInstancesCleanupTask _staleInstancesCleanupTask;
protected TaskMetricsEmitter _taskMetricsEmitter;
protected MultiThreadedHttpConnectionManager _connectionManager;
+ protected TenantRebalancer _tenantRebalancer;
+ protected ExecutorService _tenantRebalanceExecutorService;
@Override
public void init(PinotConfiguration pinotConfiguration)
@@ -223,6 +227,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
// This executor service is used to do async tasks from multiget util or
table rebalancing.
_executorService =
Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
+ _tenantRebalanceExecutorService =
+ Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setNameFormat("tenant-rebalance-thread-%d").build());
+ _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager,
_tenantRebalanceExecutorService);
}
// Initialize the table config tuner registry.
@@ -484,6 +491,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
+ bind(_tenantRebalancer).to(TenantRebalancer.class);
String loggerRootDir =
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
if (loggerRootDir != null) {
bind(new LocalLogFileServer(loggerRootDir)).to(LogFileServer.class);
@@ -781,6 +789,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Shutting down executor service");
_executorService.shutdownNow();
_executorService.awaitTermination(10L, TimeUnit.SECONDS);
+ _tenantRebalanceExecutorService.shutdownNow();
+ _tenantRebalanceExecutorService.awaitTermination(10L, TimeUnit.SECONDS);
} catch (final Exception e) {
LOGGER.error("Caught exception while shutting down", e);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 70a24e15e4..f160bba80d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -32,6 +32,7 @@ import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -49,6 +50,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.api.access.AccessType;
@@ -56,6 +58,10 @@ import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
@@ -64,6 +70,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +113,9 @@ public class PinotTenantRestletResource {
@Inject
ControllerMetrics _controllerMetrics;
+ @Inject
+ TenantRebalancer _tenantRebalancer;
+
@POST
@Path("/tenants")
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.CREATE_TENANT)
@@ -565,4 +575,49 @@ public class PinotTenantRestletResource {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_TENANT_DELETE_ERROR,
1L);
throw new ControllerApplicationException(LOGGER, "Error deleting tenant",
Response.Status.INTERNAL_SERVER_ERROR);
}
+
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.UPDATE)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.REBALANCE_TENANT_TABLES)
+ @Path("/tenants/{tenantName}/rebalance")
+ @ApiOperation(value = "Rebalances all the tables that are part of the
tenant")
+ public TenantRebalanceResult rebalance(
+ @ApiParam(value = "Name of the tenant whose table are to be rebalanced",
required = true)
+ @PathParam("tenantName") String tenantName, @ApiParam(required = true)
TenantRebalanceContext context) {
+ context.setTenantName(tenantName);
+ return _tenantRebalancer.rebalance(context);
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.READ)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_REBALANCE_STATUS)
+ @Path("/tenants/rebalanceStatus/{jobId}")
+ @ApiOperation(value = "Gets detailed stats of a tenant rebalance operation",
+ notes = "Gets detailed stats of a tenant rebalance operation")
+ public TenantRebalanceJobStatusResponse rebalanceStatus(
+ @ApiParam(value = "Tenant rebalance job id", required = true)
@PathParam("jobId") String jobId)
+ throws JsonProcessingException {
+ Map<String, String> controllerJobZKMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TENANT_REBALANCE);
+
+ if (controllerJobZKMetadata == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
+ Response.Status.NOT_FOUND);
+ }
+ TenantRebalanceProgressStats tenantRebalanceProgressStats =
+
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+ TenantRebalanceProgressStats.class);
+ long timeSinceStartInSecs =
tenantRebalanceProgressStats.getTimeToFinishInSeconds();
+ if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) {
+ timeSinceStartInSecs =
+ (System.currentTimeMillis() -
tenantRebalanceProgressStats.getStartTimeMs()) / 1000;
+ }
+
+ TenantRebalanceJobStatusResponse tenantRebalanceJobStatusResponse = new
TenantRebalanceJobStatusResponse();
+
tenantRebalanceJobStatusResponse.setTenantRebalanceProgressStats(tenantRebalanceProgressStats);
+
tenantRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
+ return tenantRebalanceJobStatusResponse;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java
new file mode 100644
index 0000000000..db1bfda9e8
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
+
+
+public class TenantRebalanceJobStatusResponse {
+ private long _timeElapsedSinceStartInSeconds;
+ private TenantRebalanceProgressStats _tenantRebalanceProgressStats;
+
+ public long getTimeElapsedSinceStartInSeconds() {
+ return _timeElapsedSinceStartInSeconds;
+ }
+
+ public void setTimeElapsedSinceStartInSeconds(long
timeElapsedSinceStartInSeconds) {
+ _timeElapsedSinceStartInSeconds = timeElapsedSinceStartInSeconds;
+ }
+
+ public TenantRebalanceProgressStats getTenantRebalanceProgressStats() {
+ return _tenantRebalanceProgressStats;
+ }
+
+ public void setTenantRebalanceProgressStats(TenantRebalanceProgressStats
tenantRebalanceProgressStats) {
+ _tenantRebalanceProgressStats = tenantRebalanceProgressStats;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
new file mode 100644
index 0000000000..cd6e06c399
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+ // TODO : simplify the rebalance configs wherever possible
+ @JsonProperty("dryRun")
+ @ApiModelProperty(example = "false")
+ private Boolean _dryRun = false;
+ @JsonProperty("reassignInstances")
+ @ApiModelProperty(example = "false")
+ private Boolean _reassignInstances = false;
+ @JsonProperty("includeConsuming")
+ @ApiModelProperty(example = "false")
+ private Boolean _includeConsuming = false;
+ @JsonProperty("bootstrap")
+ @ApiModelProperty(example = "false")
+ private Boolean _bootstrap = false;
+ @JsonProperty("downtime")
+ @ApiModelProperty(example = "false")
+ private Boolean _downtime = false;
+ @JsonProperty("minAvailableReplicas")
+ @ApiModelProperty(example = "1")
+ private Integer _minAvailableReplicas = 1;
+ @JsonProperty("bestEfforts")
+ @ApiModelProperty(example = "false")
+ private Boolean _bestEfforts = false;
+ @JsonProperty("externalViewCheckIntervalInMs")
+ @ApiModelProperty(example = "1000")
+ private Long _externalViewCheckIntervalInMs = 1000L;
+ @JsonProperty("externalViewStabilizationTimeoutInMs")
+ @ApiModelProperty(example = "3600000")
+ private Long _externalViewStabilizationTimeoutInMs = 3600000L;
+ @JsonProperty("updateTargetTier")
+ @ApiModelProperty(example = "false")
+ private Boolean _updateTargetTier = false;
+
+ public Boolean isDryRun() {
+ return _dryRun;
+ }
+
+ public void setDryRun(Boolean dryRun) {
+ _dryRun = dryRun;
+ }
+
+ public Boolean isReassignInstances() {
+ return _reassignInstances;
+ }
+
+ public void setReassignInstances(Boolean reassignInstances) {
+ _reassignInstances = reassignInstances;
+ }
+
+ public Boolean isIncludeConsuming() {
+ return _includeConsuming;
+ }
+
+ public void setIncludeConsuming(Boolean includeConsuming) {
+ _includeConsuming = includeConsuming;
+ }
+
+ public Boolean isBootstrap() {
+ return _bootstrap;
+ }
+
+ public void setBootstrap(Boolean bootstrap) {
+ _bootstrap = bootstrap;
+ }
+
+ public Boolean isDowntime() {
+ return _downtime;
+ }
+
+ public void setDowntime(Boolean downtime) {
+ _downtime = downtime;
+ }
+
+ public Integer getMinAvailableReplicas() {
+ return _minAvailableReplicas;
+ }
+
+ public void setMinAvailableReplicas(Integer minAvailableReplicas) {
+ _minAvailableReplicas = minAvailableReplicas;
+ }
+
+ public Boolean isBestEfforts() {
+ return _bestEfforts;
+ }
+
+ public void setBestEfforts(Boolean bestEfforts) {
+ _bestEfforts = bestEfforts;
+ }
+
+ public Long getExternalViewCheckIntervalInMs() {
+ return _externalViewCheckIntervalInMs;
+ }
+
+ public void setExternalViewCheckIntervalInMs(Long
externalViewCheckIntervalInMs) {
+ _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs;
+ }
+
+ public Long getExternalViewStabilizationTimeoutInMs() {
+ return _externalViewStabilizationTimeoutInMs;
+ }
+
+ public void setExternalViewStabilizationTimeoutInMs(Long
externalViewStabilizationTimeoutInMs) {
+ _externalViewStabilizationTimeoutInMs =
externalViewStabilizationTimeoutInMs;
+ }
+
+ public Boolean isUpdateTargetTier() {
+ return _updateTargetTier;
+ }
+
+ public void setUpdateTargetTier(Boolean updateTargetTier) {
+ _updateTargetTier = updateTargetTier;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index c76d06fcaf..ed3ad624d8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import javax.annotation.Nullable;
@@ -28,11 +29,15 @@ import
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
public class RebalanceResult {
private final String _jobId;
private final Status _status;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<InstancePartitionsType, InstancePartitions>
_instanceAssignment;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<String, InstancePartitions> _tierInstanceAssignment;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<String, Map<String, String>> _segmentAssignment;
private final String _description;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
new file mode 100644
index 0000000000..d0278c15e3
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+ PinotHelixResourceManager _pinotHelixResourceManager;
+ ExecutorService _executorService;
+
+ public DefaultTenantRebalancer(PinotHelixResourceManager
pinotHelixResourceManager, ExecutorService executorService) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _executorService = executorService;
+ }
+
+ @Override
+ public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+ Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+ Set<String> tables = getTenantTables(context.getTenantName());
+ tables.forEach(table -> {
+ try {
+ Configuration config = extractRebalanceConfig(context);
+ config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+ rebalanceResult.put(table,
_pinotHelixResourceManager.rebalanceTable(table, config, false));
+ } catch (TableNotFoundException exception) {
+ rebalanceResult.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
+ null, null, null));
+ }
+ });
+ if (context.isDryRun() || context.isDowntime()) {
+ return new TenantRebalanceResult(null, rebalanceResult,
context.isVerboseResult());
+ } else {
+ for (String table : rebalanceResult.keySet()) {
+ RebalanceResult result = rebalanceResult.get(table);
+ if (result.getStatus() == RebalanceResult.Status.DONE) {
+ rebalanceResult.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
+ "In progress, check controller task status for the",
result.getInstanceAssignment(),
+ result.getTierInstanceAssignment(),
result.getSegmentAssignment()));
+ }
+ }
+ }
+
+ String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+ TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+ tables, _pinotHelixResourceManager);
+ observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
+ final Deque<String> sequentialQueue = new LinkedList<>();
+ final Deque<String> parallelQueue = new ConcurrentLinkedDeque<>();
+ // ensure atleast 1 thread is created to run the sequential table
rebalance operations
+ int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
+ Set<String> dimTables = getDimensionalTables(context.getTenantName());
+ AtomicInteger activeThreads = new AtomicInteger(parallelism);
+ try {
+ if (parallelism > 1) {
+ Set<String> parallelTables;
+ if (!context.getParallelWhitelist().isEmpty()) {
+ parallelTables = new HashSet<>(context.getParallelWhitelist());
+ } else {
+ parallelTables = new HashSet<>(tables);
+ }
+ if (!context.getParallelBlacklist().isEmpty()) {
+ parallelTables = Sets.difference(parallelTables,
context.getParallelBlacklist());
+ }
+ parallelTables.forEach(table -> {
+ if (dimTables.contains(table)) {
+ // prioritise dimension tables
+ parallelQueue.addFirst(table);
+ } else {
+ parallelQueue.addLast(table);
+ }
+ });
+ Sets.difference(tables, parallelTables).forEach(table -> {
+ if (dimTables.contains(table)) {
+ // prioritise dimension tables
+ sequentialQueue.addFirst(table);
+ } else {
+ sequentialQueue.addLast(table);
+ }
+ });
+ } else {
+ tables.forEach(table -> {
+ if (dimTables.contains(table)) {
+ // prioritise dimension tables
+ sequentialQueue.addFirst(table);
+ } else {
+ sequentialQueue.addLast(table);
+ }
+ });
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ _executorService.submit(() -> {
+ while (true) {
+ String table = parallelQueue.pollFirst();
+ if (table == null) {
+ break;
+ }
+ Configuration config = extractRebalanceConfig(context);
+ config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+ config.setProperty(RebalanceConfigConstants.JOB_ID,
rebalanceResult.get(table).getJobId());
+ rebalanceTable(table, config, observer);
+ }
+ // Last parallel thread to finish the table rebalance job will pick
up the
+ // sequential table rebalance execution
+ if (activeThreads.decrementAndGet() == 0) {
+ Configuration config = extractRebalanceConfig(context);
+ config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+ while (true) {
+ String table = sequentialQueue.pollFirst();
+ if (table == null) {
+ break;
+ }
+ config.setProperty(RebalanceConfigConstants.JOB_ID,
rebalanceResult.get(table).getJobId());
+ rebalanceTable(table, config, observer);
+ }
+ observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", context.getTenantName()));
+ }
+ });
+ }
+ } catch (Exception exception) {
+ observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", context.getTenantName(),
+ exception.getMessage()));
+ }
+ return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult,
context.isVerboseResult());
+ }
+
+ private Set<String> getDimensionalTables(String tenantName) {
+ Set<String> dimTables = new HashSet<>();
+ for (String table : _pinotHelixResourceManager.getAllTables()) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(table);
+ if (tableConfig == null) {
+ LOGGER.error("Unable to retrieve table config for table: {}", table);
+ continue;
+ }
+ if (tenantName.equals(tableConfig.getTenantConfig().getServer()) &&
tableConfig.isDimTable()) {
+ dimTables.add(table);
+ }
+ }
+ return dimTables;
+ }
+
+ private Configuration extractRebalanceConfig(TenantRebalanceContext context)
{
+ Configuration rebalanceConfig = new BaseConfiguration();
+ rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN,
context.isDryRun());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES,
context.isReassignInstances());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
context.isIncludeConsuming());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP,
context.isBootstrap());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME,
context.isDowntime());
+
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
+ context.getMinAvailableReplicas());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS,
context.isBestEfforts());
+
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
+ context.getExternalViewCheckIntervalInMs());
+
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
+ context.getExternalViewStabilizationTimeoutInMs());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER,
context.isUpdateTargetTier());
+ rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID,
createUniqueRebalanceJobIdentifier());
+ return rebalanceConfig;
+ }
+
+ private String createUniqueRebalanceJobIdentifier() {
+ return UUID.randomUUID().toString();
+ }
+
+ private Set<String> getTenantTables(String tenantName) {
+ Set<String> tables = new HashSet<>();
+ for (String table : _pinotHelixResourceManager.getAllTables()) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(table);
+ if (tableConfig == null) {
+ LOGGER.error("Unable to retrieve table config for table: {}", table);
+ continue;
+ }
+ String tableConfigTenant = tableConfig.getTenantConfig().getServer();
+ if (tenantName.equals(tableConfigTenant)) {
+ tables.add(table);
+ }
+ }
+ return tables;
+ }
+
+ private void rebalanceTable(String tableName, Configuration config,
+ TenantRebalanceObserver observer) {
+ try {
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName,
+ config.getString(RebalanceConfigConstants.JOB_ID));
+ RebalanceResult result =
_pinotHelixResourceManager.rebalanceTable(tableName, config, true);
+ if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
+ } else {
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
+ result.getDescription());
+ }
+ } catch (Throwable t) {
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
+ String.format("Caught exception/error while rebalancing table: %s",
tableName));
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
new file mode 100644
index 0000000000..5e76dcc014
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceContext;
+
+
+public class TenantRebalanceContext extends RebalanceContext {
+ @JsonIgnore
+ private String _tenantName;
+ @JsonProperty("degreeOfParallelism")
+ @ApiModelProperty(example = "1")
+ private Integer _degreeOfParallelism = 1;
+ @JsonProperty("parallelWhitelist")
+ private Set<String> _parallelWhitelist = new HashSet<>();
+ @JsonProperty("parallelBlacklist")
+ private Set<String> _parallelBlacklist = new HashSet<>();
+
+ private boolean _verboseResult = false;
+
+ public String getTenantName() {
+ return _tenantName;
+ }
+
+ public void setTenantName(String tenantName) {
+ _tenantName = tenantName;
+ }
+
+ public int getDegreeOfParallelism() {
+ return _degreeOfParallelism;
+ }
+
+ public void setDegreeOfParallelism(int degreeOfParallelism) {
+ _degreeOfParallelism = degreeOfParallelism;
+ }
+
+ public Set<String> getParallelWhitelist() {
+ return _parallelWhitelist;
+ }
+
+ public void setParallelWhitelist(Set<String> parallelWhitelist) {
+ _parallelWhitelist = parallelWhitelist;
+ }
+
+ public Set<String> getParallelBlacklist() {
+ return _parallelBlacklist;
+ }
+
+ public void setParallelBlacklist(Set<String> parallelBlacklist) {
+ _parallelBlacklist = parallelBlacklist;
+ }
+
+ public boolean isVerboseResult() {
+ return _verboseResult;
+ }
+
+ public void setVerboseResult(boolean verboseResult) {
+ _verboseResult = verboseResult;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java
similarity index 59%
copy from
pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
copy to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java
index aaad454787..82dd086362 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java
@@ -16,8 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.common.metadata.controllerjob;
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
-public enum ControllerJobType {
- RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+public interface TenantRebalanceObserver {
+ enum Trigger {
+ // Start of tenant rebalance Trigger
+ START_TRIGGER,
+ // rebalance of a table is started
+ REBALANCE_STARTED_TRIGGER,
+ // rebalance of a table is completed
+ REBALANCE_COMPLETED_TRIGGER,
+ // rebalance of a table is failed
+ REBALANCE_ERRORED_TRIGGER
+ }
+
+ void onTrigger(TenantRebalanceObserver.Trigger trigger, String tableName,
String description);
+
+ void onSuccess(String msg);
+
+ void onError(String errorMsg);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
new file mode 100644
index 0000000000..fb77100ece
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+public class TenantRebalanceProgressStats {
+ // status map of tables and their respective rebalance status
+ private Map<String, String> _tableStatusMap;
+ private final Map<String, String> _tableRebalanceJobIdMap = new HashMap<>();
+ private int _totalTables;
+ private int _remainingTables;
+ // When did Rebalance start
+ private long _startTimeMs;
+ // How long did rebalance take
+ private long _timeToFinishInSeconds;
+ // Success/failure message
+ private String _completionStatusMsg;
+
+ public TenantRebalanceProgressStats() {
+ }
+
+ public TenantRebalanceProgressStats(Set<String> tables) {
+ _tableStatusMap = tables.stream()
+ .collect(Collectors.toMap(Function.identity(), k ->
TableStatus.UNPROCESSED.name()));
+ _totalTables = tables.size();
+ _remainingTables = _totalTables;
+ }
+
+ public Map<String, String> getTableStatusMap() {
+ return _tableStatusMap;
+ }
+
+ public void setTableStatusMap(Map<String, String> tableStatusMap) {
+ _tableStatusMap = tableStatusMap;
+ }
+
+ public int getTotalTables() {
+ return _totalTables;
+ }
+
+ public void setTotalTables(int totalTables) {
+ _totalTables = totalTables;
+ }
+
+ public int getRemainingTables() {
+ return _remainingTables;
+ }
+
+ public void setRemainingTables(int remainingTables) {
+ _remainingTables = remainingTables;
+ }
+
+ public long getStartTimeMs() {
+ return _startTimeMs;
+ }
+
+ public void setStartTimeMs(long startTimeMs) {
+ _startTimeMs = startTimeMs;
+ }
+
+ public long getTimeToFinishInSeconds() {
+ return _timeToFinishInSeconds;
+ }
+
+ public void setTimeToFinishInSeconds(long timeToFinishInSeconds) {
+ _timeToFinishInSeconds = timeToFinishInSeconds;
+ }
+
+ public String getCompletionStatusMsg() {
+ return _completionStatusMsg;
+ }
+
+ public void setCompletionStatusMsg(String completionStatusMsg) {
+ _completionStatusMsg = completionStatusMsg;
+ }
+
+ public void updateTableStatus(String tableName, String status) {
+ _tableStatusMap.put(tableName, status);
+ }
+
+ public void putTableRebalanceJobId(String tableName, String jobId) {
+ _tableRebalanceJobIdMap.put(tableName, jobId);
+ }
+
+ public Map<String, String> getTableRebalanceJobIdMap() {
+ return _tableRebalanceJobIdMap;
+ }
+
+ public enum TableStatus {
+ UNPROCESSED, PROCESSING, PROCESSED
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
new file mode 100644
index 0000000000..57c17054d7
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TenantRebalanceResult {
+ private String _jobId;
+ private Map<String, RebalanceResult> _rebalanceTableResults;
+
+ public TenantRebalanceResult(String jobId, Map<String, RebalanceResult>
rebalanceTableResults, boolean verbose) {
+ _jobId = jobId;
+ if (verbose) {
+ _rebalanceTableResults = rebalanceTableResults;
+ } else {
+ _rebalanceTableResults = new HashMap<>();
+ rebalanceTableResults.forEach((table, result) -> {
+ _rebalanceTableResults.put(table, new
RebalanceResult(result.getJobId(), result.getStatus(),
+ result.getDescription(), null, null, null));
+ });
+ }
+ }
+
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public Map<String, RebalanceResult> getRebalanceTableResults() {
+ return _rebalanceTableResults;
+ }
+
+ public void setJobId(String jobId) {
+ _jobId = jobId;
+ }
+
+ public void setRebalanceTableResults(Map<String, RebalanceResult>
rebalanceTableResults) {
+ _rebalanceTableResults = rebalanceTableResults;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
similarity index 82%
copy from
pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
copy to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
index aaad454787..53df7824d5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.common.metadata.controllerjob;
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
-public enum ControllerJobType {
- RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+
+public interface TenantRebalancer {
+ TenantRebalanceResult rebalance(TenantRebalanceContext context);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
new file mode 100644
index 0000000000..7521caa3f3
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class);
+
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final String _jobId;
+ private final String _tenantName;
+ private final List<String> _unprocessedTables;
+ private final TenantRebalanceProgressStats _progressStats;
+ // Keep track of number of updates. Useful during debugging.
+ private int _numUpdatesToZk;
+
+ public ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
Set<String> tables,
+ PinotHelixResourceManager pinotHelixResourceManager) {
+ Preconditions.checkState(tables != null && !tables.isEmpty(), "List of
tables to observe is empty.");
+ _jobId = jobId;
+ _tenantName = tenantName;
+ _unprocessedTables = new ArrayList<>(tables);
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _progressStats = new TenantRebalanceProgressStats(tables);
+ _numUpdatesToZk = 0;
+ }
+
+ @Override
+ public void onTrigger(Trigger trigger, String tableName, String description)
{
+ switch (trigger) {
+ case START_TRIGGER:
+ _progressStats.setStartTimeMs(System.currentTimeMillis());
+ break;
+ case REBALANCE_STARTED_TRIGGER:
+ _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+ _progressStats.putTableRebalanceJobId(tableName, description);
+ break;
+ case REBALANCE_COMPLETED_TRIGGER:
+ _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
+ _unprocessedTables.remove(tableName);
+ _progressStats.setRemainingTables(_unprocessedTables.size());
+ break;
+ case REBALANCE_ERRORED_TRIGGER:
+ _progressStats.updateTableStatus(tableName, description);
+ _unprocessedTables.remove(tableName);
+ _progressStats.setRemainingTables(_unprocessedTables.size());
+ break;
+ default:
+ }
+ trackStatsInZk();
+ }
+
+ @Override
+ public void onSuccess(String msg) {
+ _progressStats.setCompletionStatusMsg(msg);
+ _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
_progressStats.getStartTimeMs()) / 1000);
+ trackStatsInZk();
+ }
+
+ @Override
+ public void onError(String errorMsg) {
+ _progressStats.setCompletionStatusMsg(errorMsg);
+ _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() -
_progressStats.getStartTimeMs());
+ trackStatsInZk();
+ }
+
+ private void trackStatsInZk() {
+ Map<String, String> jobMetadata = new HashMap<>();
+ jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TENANT_REBALANCE.name());
+ try {
+ jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(_progressStats));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
+ }
+ _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
+
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TENANT_REBALANCE));
+ _numUpdatesToZk++;
+ LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
new file mode 100644
index 0000000000..a4237fee3f
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class TenantRebalancerTest extends ControllerTest {
+
+ private static final String DEFAULT_TENANT_NAME = "DefaultTenant";
+ private static final String TENANT_NAME = "TestTenant";
+ private static final String RAW_TABLE_NAME_A = "testTableA";
+ private static final String OFFLINE_TABLE_NAME_A =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_A);
+ private static final String RAW_TABLE_NAME_B = "testTableB";
+ private static final String OFFLINE_TABLE_NAME_B =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_B);
+ private static final int NUM_REPLICAS = 3;
+ ExecutorService _executorService;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ startZk();
+ startController();
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ _executorService = Executors.newFixedThreadPool(3);
+ }
+
+ @Test
+ public void testRebalance()
+ throws Exception {
+ int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ TenantRebalancer tenantRebalancer = new
DefaultTenantRebalancer(_helixResourceManager, _executorService);
+
+ // tag all servers and brokers to test tenant
+ addTenantTagToInstances(TENANT_NAME);
+
+ // create 2 tables, one on each of test tenant and default tenant
+ createTableWithSegments(RAW_TABLE_NAME_A, DEFAULT_TENANT_NAME);
+ createTableWithSegments(RAW_TABLE_NAME_B, TENANT_NAME);
+
+ // Add 3 more servers which will be tagged to default tenant
+ int numServersToAdd = 3;
+ for (int i = 0; i < numServersToAdd; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
(numServers + i), true);
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
+
+ // rebalance the tables on test tenant
+ TenantRebalanceContext context = new TenantRebalanceContext();
+ context.setTenantName(TENANT_NAME);
+ context.setVerboseResult(true);
+ TenantRebalanceResult result = tenantRebalancer.rebalance(context);
+ RebalanceResult rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+ Map<String, Map<String, String>> rebalancedAssignment =
rebalanceResult.getSegmentAssignment();
+ // assignment should not change, with a NO_OP status as no now server is
added to test tenant
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertEquals(oldSegmentAssignment, rebalancedAssignment);
+
+ // rebalance the tables on default tenant
+ context.setTenantName(DEFAULT_TENANT_NAME);
+ result = tenantRebalancer.rebalance(context);
+ // rebalancing default tenant should distribute the segment of table A
over 6 servers
+ rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+ InstancePartitions partitions =
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
+ assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(), 6);
+
+ // ensure the ideal state and external view converges
+ assertTrue(waitForCompletion(result.getJobId()));
+ TenantRebalanceProgressStats progressStats =
getProgress(result.getJobId());
+
assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A));
+ assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
+ TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
+ Map<String, Map<String, String>> idealState =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+ Map<String, Map<String, String>> externalView =
+
_helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+ assertEquals(idealState, externalView);
+ }
+
+ private boolean waitForCompletion(String jobId) {
+ int retries = 5;
+ while (retries > 0) {
+ try {
+ TenantRebalanceProgressStats stats = getProgress(jobId);
+ if (stats != null && stats.getRemainingTables() == 0) {
+ return true;
+ }
+ retries--;
+ Thread.sleep(2000);
+ } catch (JsonProcessingException | InterruptedException e) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ private TenantRebalanceProgressStats getProgress(String jobId)
+ throws JsonProcessingException {
+ Map<String, String> controllerJobZKMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TENANT_REBALANCE);
+ if (controllerJobZKMetadata == null) {
+ return null;
+ }
+ return
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+ TenantRebalanceProgressStats.class);
+ }
+
+ private void createTableWithSegments(String rawTableName, String tenant)
+ throws IOException {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
+
.setServerTenant(tenant).setBrokerTenant(tenant).setNumReplicas(NUM_REPLICAS).build();
+ // Create the table
+ _helixResourceManager.addTable(tableConfig);
+ // Add the segments
+ int numSegments = 10;
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(offlineTableName,
+ SegmentMetadataMockUtils.mockSegmentMetadata(rawTableName,
"segment_" + i), null);
+ }
+ }
+
+ private void addTenantTagToInstances(String testTenant) {
+ String offlineTag = TagNameUtils.getOfflineTagForTenant(testTenant);
+ String brokerTag = TagNameUtils.getBrokerTagForTenant(testTenant);
+ _helixResourceManager.getAllInstances().forEach(instance -> {
+ List<String> existingTags =
_helixResourceManager.getHelixInstanceConfig(instance).getTags();
+ if (instance.startsWith(SERVER_INSTANCE_ID_PREFIX)) {
+ existingTags.add(offlineTag);
+ } else if (instance.startsWith(BROKER_INSTANCE_ID_PREFIX)) {
+ existingTags.add(brokerTag);
+ }
+ _helixResourceManager.updateInstanceTags(instance, String.join(",",
existingTags), true);
+ });
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ _executorService.shutdown();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 5c1516210d..e72d066bc9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -80,6 +80,7 @@ public class Actions {
public static final String UPDATE_TASK_QUEUE = "UpdateTaskQueue";
public static final String UPDATE_TENANT = "UpdateTenant";
public static final String UPDATE_TENANT_METADATA = "UpdateTenantMetadata";
+ public static final String REBALANCE_TENANT_TABLES =
"RebalanceTenantTables";
public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval";
public static final String UPDATE_USER = "UpdateUser";
public static final String UPDATE_ZNODE = "UpdateZnode";
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 180744a831..46b2c5148e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -754,6 +754,7 @@ public class CommonConstants {
*/
public static final String JOB_TYPE = "jobType";
public static final String TABLE_NAME_WITH_TYPE = "tableName";
+ public static final String TENANT_NAME = "tenantName";
public static final String JOB_ID = "jobId";
public static final String SUBMISSION_TIME_MS = "submissionTimeMs";
public static final String MESSAGE_COUNT = "messageCount";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]