This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f81b03c204 Refactor ControllerJobType enum into extensible interface
(#16106)
f81b03c204 is described below
commit f81b03c20493c9dd8ed818c1706262eb2dd64797
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Jun 16 04:31:35 2025 +0100
Refactor ControllerJobType enum into extensible interface (#16106)
---
.../pinot/controller/BaseControllerStarter.java | 4 +-
.../apache/pinot/controller/ControllerConf.java | 10 ++--
.../api/resources/PinotRealtimeTableResource.java | 6 +--
.../api/resources/PinotSegmentRestletResource.java | 4 +-
.../api/resources/PinotTableRestletResource.java | 14 +++---
.../api/resources/PinotTenantRestletResource.java | 4 +-
.../helix/core/PinotHelixResourceManager.java | 18 ++++----
...trollerJobType.java => ControllerJobTypes.java} | 22 ++++-----
.../helix/core/rebalance/RebalanceChecker.java | 8 ++--
.../core/rebalance/TableRebalanceManager.java | 9 ++--
.../rebalance/ZkBasedTableRebalanceObserver.java | 6 +--
.../tenant/ZkBasedTenantRebalanceObserver.java | 6 +--
.../helix/core/util/ControllerZkHelixUtils.java | 6 +--
.../helix/core/rebalance/RebalanceCheckerTest.java | 16 +++----
.../rebalance/tenant/TenantRebalancerTest.java | 4 +-
.../core/util/ControllerZkHelixUtilsTest.java | 30 ++++++------
.../tests/LLCRealtimeClusterIntegrationTest.java | 4 +-
.../tests/TableRebalanceIntegrationTest.java | 28 +++++------
.../pinot/spi/controller/ControllerJobType.java | 54 ++++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
20 files changed, 154 insertions(+), 101 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 346fcc7b24..8ab5c6ac27 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -94,7 +94,7 @@ import
org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -281,7 +281,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled());
ContinuousJfrStarter.init(_config);
- ControllerJobType.init(_config);
+ ControllerJobTypes.init(_config);
}
/// Returns the default cluster configs to be stored in ZK as Helix cluster
config. These configs will then be
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 03f4c9bca7..4b64b2abf0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.utils.TimeUtils;
import static
org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_CONTROLLER_METRICS_PREFIX;
import static
org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_INSTANCE_ID;
import static
org.apache.pinot.spi.utils.CommonConstants.Controller.DEFAULT_METRICS_PREFIX;
+import static org.apache.pinot.spi.utils.CommonConstants.ControllerJob;
public class ControllerConf extends PinotConfiguration {
@@ -383,7 +384,6 @@ public class ControllerConf extends PinotConfiguration {
public static final String CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK =
"controller.tenant.rebalance.maxJobsInZK";
public static final String CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK =
"controller.reload.segment.maxJobsInZK";
public static final String CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK =
"controller.force.commit.maxJobsInZK";
- public static final Integer DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK = 100;
private final Map<String, String> _invalidConfigs = new
ConcurrentHashMap<>();
@@ -1319,18 +1319,18 @@ public class ControllerConf extends PinotConfiguration {
}
public int getMaxTableRebalanceZkJobs() {
- return getProperty(CONFIG_OF_MAX_TABLE_REBALANCE_JOBS_IN_ZK,
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+ return getProperty(CONFIG_OF_MAX_TABLE_REBALANCE_JOBS_IN_ZK,
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
}
public int getMaxTenantRebalanceZkJobs() {
- return getProperty(CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK,
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+ return getProperty(CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK,
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
}
public int getMaxReloadSegmentZkJobs() {
- return getProperty(CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK,
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+ return getProperty(CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK,
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
}
public int getMaxForceCommitZkJobs() {
- return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK,
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+ return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK,
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index a07e952da6..e4e5a0d17d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -54,7 +54,7 @@ import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.core.auth.Actions;
@@ -233,7 +233,7 @@ public class PinotRealtimeTableResource {
throws Exception {
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(forceCommitJobId,
- ControllerJobType.FORCE_COMMIT);
+ ControllerJobTypes.FORCE_COMMIT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + forceCommitJobId,
Response.Status.NOT_FOUND);
@@ -259,7 +259,7 @@ public class PinotRealtimeTableResource {
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
JsonUtils.objectToString(segmentsYetToBeCommitted));
_pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId,
controllerJobZKMetadata,
- ControllerJobType.FORCE_COMMIT);
+ ControllerJobTypes.FORCE_COMMIT);
}
Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 560724fcc2..f00a1dcd7b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -86,7 +86,7 @@ import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.controller.util.TableTierReader;
@@ -533,7 +533,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Reload job id", required = true) @PathParam("jobId")
String reloadJobId)
throws Exception {
Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobType.RELOAD_SEGMENT);
+ _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + reloadJobId,
Status.NOT_FOUND);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 38a781677f..71a27d325a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -95,7 +95,7 @@ import
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
@@ -117,6 +117,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -767,7 +768,7 @@ public class PinotTableRestletResource {
}
public Map<String, String> getControllerJobMetadata(String jobId) {
- return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TABLE_REBALANCE);
+ return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TABLE_REBALANCE);
}
@DELETE
@@ -1131,12 +1132,13 @@ public class PinotTableRestletResource {
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableTypeFromRequest,
LOGGER);
- EnumSet<ControllerJobType> jobTypesToFilter = null;
+ Set<ControllerJobType> jobTypesToFilter = null;
if (StringUtils.isNotEmpty(jobTypesString)) {
+ jobTypesToFilter = new HashSet<>();
for (String jobTypeStr : StringUtils.split(jobTypesString, ',')) {
- ControllerJobType jobType;
+ ControllerJobTypes jobType;
try {
- jobType = ControllerJobType.valueOf(jobTypeStr.toUpperCase());
+ jobType = ControllerJobTypes.valueOf(jobTypeStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new ControllerApplicationException(LOGGER, "Unknown job type:
" + jobTypeStr,
Response.Status.BAD_REQUEST);
@@ -1147,7 +1149,7 @@ public class PinotTableRestletResource {
Map<String, Map<String, String>> result = new HashMap<>();
for (String tableNameWithType : tableNamesWithType) {
result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter ==
null
- ? EnumSet.allOf(ControllerJobType.class) : jobTypesToFilter,
+ ? new HashSet<>(EnumSet.allOf(ControllerJobTypes.class)) :
jobTypesToFilter,
jobMetadata ->
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
.equals(tableNameWithType)));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 45bee25da1..e2175e7577 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -61,7 +61,7 @@ import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
@@ -707,7 +707,7 @@ public class PinotTenantRestletResource {
@ApiParam(value = "Tenant rebalance job id", required = true)
@PathParam("jobId") String jobId)
throws JsonProcessingException {
Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TENANT_REBALANCE);
+ _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ead9b9d712..d377ed190f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -37,7 +37,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -150,7 +149,7 @@ import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.lineage.LineageManager;
import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -174,6 +173,7 @@ import org.apache.pinot.spi.config.user.ComponentType;
import org.apache.pinot.spi.config.user.RoleType;
import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -2405,7 +2405,7 @@ public class PinotHelixResourceManager {
* Returns a Map of jobId to job's ZK metadata that passes the checker, like
for specific tables.
* @return A Map of jobId to job properties
*/
- public Map<String, Map<String, String>>
getAllJobs(EnumSet<ControllerJobType> jobTypes,
+ public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType>
jobTypes,
Predicate<Map<String, String>> jobMetadataChecker) {
return ControllerZkHelixUtils.getAllControllerJobs(jobTypes,
jobMetadataChecker, _propertyStore);
}
@@ -2425,14 +2425,14 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.RELOAD_SEGMENT.name());
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numMessagesSent));
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME,
segmentNames);
if (instanceName != null) {
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME,
instanceName);
}
- return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.RELOAD_SEGMENT);
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobTypes.RELOAD_SEGMENT);
}
/**
@@ -2449,13 +2449,13 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.RELOAD_SEGMENT.name());
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numberOfMessagesSent));
if (instanceName != null) {
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME,
instanceName);
}
- return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.RELOAD_SEGMENT);
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobTypes.RELOAD_SEGMENT);
}
public boolean addNewForceCommitJob(String tableNameWithType, String jobId,
long jobSubmissionTimeMs,
@@ -2464,11 +2464,11 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.FORCE_COMMIT.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.FORCE_COMMIT.name());
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
- return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.FORCE_COMMIT);
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobTypes.FORCE_COMMIT);
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java
similarity index 83%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java
index c4cd936d56..18e67c69a1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java
@@ -26,6 +26,8 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
+import org.apache.pinot.spi.controller.ControllerJobType;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,20 +36,18 @@ import org.slf4j.LoggerFactory;
/**
* Controller jobs that store metadata in the ZK property store.
*/
-public enum ControllerJobType {
+public enum ControllerJobTypes implements ControllerJobType {
RELOAD_SEGMENT,
FORCE_COMMIT,
TABLE_REBALANCE,
TENANT_REBALANCE;
- private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerJobType.class);
- private static final EnumMap<ControllerJobType, Integer> ZK_NUM_JOBS_LIMIT =
new EnumMap<>(ControllerJobType.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerJobTypes.class);
+ private static final EnumMap<ControllerJobTypes, Integer> ZK_NUM_JOBS_LIMIT
= new EnumMap<>(ControllerJobTypes.class);
- /**
- * Gets the maximum number of job metadata entries that can be stored in ZK
for this job type.
- */
+ @Override
public Integer getZkNumJobsLimit() {
- return ZK_NUM_JOBS_LIMIT.getOrDefault(this,
ControllerConf.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+ return ZK_NUM_JOBS_LIMIT.getOrDefault(this,
CommonConstants.ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
}
public static void init(ControllerConf controllerConf) {
@@ -57,13 +57,7 @@ public enum ControllerJobType {
ZK_NUM_JOBS_LIMIT.put(TENANT_REBALANCE,
controllerConf.getMaxTenantRebalanceZkJobs());
}
- /**
- * Checks if the job metadata entry can be safely deleted. Note that the job
metadata entry will only be attempted
- * to be deleted when the number of entries in the job metadata map exceeds
the configured limit for the job type.
- *
- * @param jobMetadataEntry The job metadata entry to check - a pair of job
ID and job metadata map
- * @return true if the job metadata entry can be safely deleted, false
otherwise
- */
+ @Override
public boolean canDelete(Pair<String, Map<String, String>> jobMetadataEntry)
{
switch (this) {
case TABLE_REBALANCE:
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index e7593bfdea..5c73cce4c5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.helix.core.rebalance;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -38,7 +37,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -83,7 +82,7 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
private synchronized int retryRebalanceTables(Set<String>
tableNamesWithType) {
// Get all jobMetadata for all the given tables with a single ZK read.
Map<String, Map<String, String>> allJobMetadataByJobId =
-
_pinotHelixResourceManager.getAllJobs(EnumSet.of(ControllerJobType.TABLE_REBALANCE),
+
_pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TABLE_REBALANCE),
jobMetadata -> tableNamesWithType.contains(
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)));
Map<String, Map<String, Map<String, String>>> tableJobMetadataMap = new
HashMap<>();
@@ -216,7 +215,8 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
}
private static void abortExistingJobs(String tableNameWithType,
PinotHelixResourceManager pinotHelixResourceManager) {
- boolean updated =
pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
+ boolean updated =
+ pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobTypes.TABLE_REBALANCE,
jobMetadata -> {
String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
try {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
index 0747ce7208..1536186dde 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,7 +37,7 @@ import
org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.ControllerMetrics;
import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -210,7 +209,7 @@ public class TableRebalanceManager {
*/
public List<String> cancelRebalance(String tableNameWithType) {
List<String> cancelledJobIds = new ArrayList<>();
- boolean updated = _resourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
+ boolean updated = _resourceManager.updateJobsForTable(tableNameWithType,
ControllerJobTypes.TABLE_REBALANCE,
jobMetadata -> {
String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
try {
@@ -246,7 +245,7 @@ public class TableRebalanceManager {
public ServerRebalanceJobStatusResponse getRebalanceStatus(String jobId)
throws JsonProcessingException {
Map<String, String> controllerJobZKMetadata =
- _resourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TABLE_REBALANCE);
+ _resourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TABLE_REBALANCE);
if (controllerJobZKMetadata == null) {
LOGGER.warn("Rebalance job with ID: {} not found", jobId);
throw new NotFoundException("Rebalance job with ID: " + jobId + " not
found");
@@ -292,7 +291,7 @@ public class TableRebalanceManager {
public static String rebalanceJobInProgress(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
// Get all jobMetadata for the given table with a single ZK read.
Map<String, Map<String, String>> allJobMetadataByJobId =
-
ControllerZkHelixUtils.getAllControllerJobs(EnumSet.of(ControllerJobType.TABLE_REBALANCE),
+
ControllerZkHelixUtils.getAllControllerJobs(Set.of(ControllerJobTypes.TABLE_REBALANCE),
jobMetadata -> tableNameWithType.equals(
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)),
propertyStore);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index bd30e98168..32fedd509f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -29,7 +29,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -282,7 +282,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
Map<String, String> jobMetadata =
createJobMetadata(_tableNameWithType, _rebalanceJobId,
_tableRebalanceProgressStats, _tableRebalanceContext);
ControllerZkHelixUtils.addControllerJobToZK(_propertyStore,
_rebalanceJobId, jobMetadata,
- ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> {
+ ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> {
// In addition to updating job progress status, the observer also
checks if the job status is IN_PROGRESS.
// If not, then no need to update the job status, and we keep this
status to end the job promptly.
if (prevJobMetadata == null) {
@@ -318,7 +318,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(tableRebalanceProgressStats));
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 6513e47a2f..0d89a94d54 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -100,14 +100,14 @@ public class ZkBasedTenantRebalanceObserver implements
TenantRebalanceObserver {
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TENANT_REBALANCE.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobType.TENANT_REBALANCE);
+ _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
_numUpdatesToZk++;
LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
index de1e8d6aa9..04b661af1e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
@@ -21,11 +21,11 @@ package org.apache.pinot.controller.helix.core.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Comparator;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
@@ -33,7 +33,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -90,7 +90,7 @@ public class ControllerZkHelixUtils {
* @param propertyStore the ZK property store to read from
* @return a map of jobId to job metadata for all the jobs that match the
given job types and metadata checker
*/
- public static Map<String, Map<String, String>>
getAllControllerJobs(EnumSet<ControllerJobType> jobTypes,
+ public static Map<String, Map<String, String>>
getAllControllerJobs(Set<ControllerJobType> jobTypes,
Predicate<Map<String, String>> jobMetadataChecker,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
Map<String, Map<String, String>> controllerJobs = new HashMap<>();
for (ControllerJobType jobType : jobTypes) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
index b424fc7193..fbf3b2d6b9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -40,7 +40,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -370,7 +370,7 @@ public class RebalanceCheckerTest {
HelixManager helixZkManager = mock(HelixManager.class);
ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
String zkPath =
-
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE.name());
+
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobTypes.TABLE_REBALANCE.name());
ZNRecord jobsZnRecord = new ZNRecord("jobs");
when(propertyStore.get(eq(zkPath), any(),
eq(AccessOption.PERSISTENT))).thenReturn(jobsZnRecord);
when(helixZkManager.getClusterManagmentTool()).thenReturn(mock(HelixAdmin.class));
@@ -380,16 +380,16 @@ public class RebalanceCheckerTest {
pinotHelixManager.addControllerJobToZK("job1",
ImmutableMap.of("jobId", "job1", "submissionTimeMs", "1000",
"tableName", "table01"),
- ControllerJobType.TABLE_REBALANCE, jmd -> true);
+ ControllerJobTypes.TABLE_REBALANCE, jmd -> true);
pinotHelixManager.addControllerJobToZK("job2",
ImmutableMap.of("jobId", "job2", "submissionTimeMs", "2000",
"tableName", "table01"),
- ControllerJobType.TABLE_REBALANCE, jmd -> false);
+ ControllerJobTypes.TABLE_REBALANCE, jmd -> false);
pinotHelixManager.addControllerJobToZK("job3",
ImmutableMap.of("jobId", "job3", "submissionTimeMs", "3000",
"tableName", "table02"),
- ControllerJobType.TABLE_REBALANCE, jmd -> true);
+ ControllerJobTypes.TABLE_REBALANCE, jmd -> true);
pinotHelixManager.addControllerJobToZK("job4",
ImmutableMap.of("jobId", "job4", "submissionTimeMs", "4000",
"tableName", "table02"),
- ControllerJobType.TABLE_REBALANCE, jmd -> true);
+ ControllerJobTypes.TABLE_REBALANCE, jmd -> true);
Map<String, Map<String, String>> jmds = jobsZnRecord.getMapFields();
assertEquals(jmds.size(), 3);
assertTrue(jmds.containsKey("job1"));
@@ -397,13 +397,13 @@ public class RebalanceCheckerTest {
assertTrue(jmds.containsKey("job4"));
Set<String> expectedJobs01 = new HashSet<>();
- pinotHelixManager.updateJobsForTable("table01",
ControllerJobType.TABLE_REBALANCE,
+ pinotHelixManager.updateJobsForTable("table01",
ControllerJobTypes.TABLE_REBALANCE,
jmd -> expectedJobs01.add(jmd.get("jobId")));
assertEquals(expectedJobs01.size(), 1);
assertTrue(expectedJobs01.contains("job1"));
Set<String> expectedJobs02 = new HashSet<>();
- pinotHelixManager.updateJobsForTable("table02",
ControllerJobType.TABLE_REBALANCE,
+ pinotHelixManager.updateJobsForTable("table02",
ControllerJobTypes.TABLE_REBALANCE,
jmd -> expectedJobs02.add(jmd.get("jobId")));
assertEquals(expectedJobs02.size(), 2);
assertTrue(expectedJobs02.contains("job3"));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index db23d24121..9a2db888a1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executors;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
@@ -149,7 +149,7 @@ public class TenantRebalancerTest extends ControllerTest {
private TenantRebalanceProgressStats getProgress(String jobId)
throws JsonProcessingException {
Map<String, String> controllerJobZKMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TENANT_REBALANCE);
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
if (controllerJobZKMetadata == null) {
return null;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
index 9aee5e9e11..70ea1ea5f8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.controller.helix.core.util;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
@@ -42,7 +42,7 @@ public class ControllerZkHelixUtilsTest {
// Setup job limits
ControllerConf controllerConf = mock(ControllerConf.class);
when(controllerConf.getMaxTableRebalanceZkJobs()).thenReturn(2);
- ControllerJobType.init(controllerConf);
+ ControllerJobTypes.init(controllerConf);
TableRebalanceProgressStats inProgressStats = new
TableRebalanceProgressStats();
inProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
@@ -53,24 +53,24 @@ public class ControllerZkHelixUtilsTest {
Map<String, Map<String, String>> jobMetadataMap = Map.of(
"job1", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"1000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(inProgressStats)),
"job2", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"3000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(completedStats)),
"job3", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"2000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(abortedStats)),
"job4", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"4000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(inProgressStats)),
"job5", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"5000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(inProgressStats))
);
Map<String, Map<String, String>> updatedJobMetadataMap =
- ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap,
ControllerJobType.TABLE_REBALANCE);
+ ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap,
ControllerJobTypes.TABLE_REBALANCE);
// Even though the limit is 2, we should not delete the in-progress jobs
assertEquals(updatedJobMetadataMap.size(), 3);
assertEquals(updatedJobMetadataMap.keySet(), Set.of("job1", "job4",
"job5"));
@@ -82,7 +82,7 @@ public class ControllerZkHelixUtilsTest {
// Setup job limits
ControllerConf controllerConf = mock(ControllerConf.class);
when(controllerConf.getMaxTableRebalanceZkJobs()).thenReturn(2);
- ControllerJobType.init(controllerConf);
+ ControllerJobTypes.init(controllerConf);
TableRebalanceProgressStats completedStats = new
TableRebalanceProgressStats();
completedStats.setStatus(RebalanceResult.Status.DONE);
@@ -91,24 +91,24 @@ public class ControllerZkHelixUtilsTest {
Map<String, Map<String, String>> jobMetadataMap = Map.of(
"job1", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"1000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(completedStats)),
"job2", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"5000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(completedStats)),
"job3", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"3000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(abortedStats)),
"job4", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"2000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(completedStats)),
"job5", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
"4000",
- CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name(),
+ CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name(),
RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(abortedStats))
);
Map<String, Map<String, String>> updatedJobMetadataMap =
- ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap,
ControllerJobType.TABLE_REBALANCE);
+ ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap,
ControllerJobTypes.TABLE_REBALANCE);
assertEquals(updatedJobMetadataMap.size(), 2);
// Retain the two most recent jobs based on submission time
assertEquals(updatedJobMetadataMap.keySet(), Set.of("job2", "job5"));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index eca80e5017..1d786400ea 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -44,7 +44,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory;
import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer;
@@ -457,7 +457,7 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
private void testForceCommitInternal(String realtimeTableName, String jobId,
Set<String> consumingSegments,
long timeoutMs) {
Map<String, String> jobMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.FORCE_COMMIT);
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.FORCE_COMMIT);
assertNotNull(jobMetadata);
assertNotNull(jobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 85dc9fd79b..a41d25ab41 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.utils.regex.Matcher;
import org.apache.pinot.common.utils.regex.Pattern;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
@@ -1291,10 +1291,10 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name());
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(progressStats));
- ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+ ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobTypes.TABLE_REBALANCE,
prevJobMetadata -> true);
// Add a new server (to force change in instance assignment) and enable
reassignInstances to ensure that the
@@ -1313,7 +1313,7 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
progressStats.setStatus(RebalanceResult.Status.DONE);
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(progressStats));
- ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+ ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobTypes.TABLE_REBALANCE,
prevJobMetadata -> true);
// Stop the added server
@@ -1335,15 +1335,16 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
String inProgressJobId =
TableRebalancer.createUniqueRebalanceJobIdentifier();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, inProgressJobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "1000");
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TABLE_REBALANCE.name());
TableRebalanceProgressStats progressStats = new
TableRebalanceProgressStats();
progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(progressStats));
ControllerZkHelixUtils.addControllerJobToZK(_propertyStore,
inProgressJobId, jobMetadata,
- ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true);
+ ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true);
-
assertNotNull(_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobType.TABLE_REBALANCE));
+ assertNotNull(
+ _helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
// Add a DONE rebalance
String doneJobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
@@ -1354,9 +1355,9 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
JsonUtils.objectToString(progressStats));
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
String.valueOf(System.currentTimeMillis()));
ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, doneJobId,
jobMetadata,
- ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true);
+ ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true);
- assertNotNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId,
ControllerJobType.TABLE_REBALANCE));
+ assertNotNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId,
ControllerJobTypes.TABLE_REBALANCE));
// Add another DONE rebalance
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
"anotherTable_REALTIME");
@@ -1365,16 +1366,17 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
String.valueOf(System.currentTimeMillis() + 1000));
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, anotherDoneJobId);
ControllerZkHelixUtils.addControllerJobToZK(_propertyStore,
anotherDoneJobId, jobMetadata,
- ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true);
+ ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true);
assertNotNull(
- _helixResourceManager.getControllerJobZKMetadata(anotherDoneJobId,
ControllerJobType.TABLE_REBALANCE));
+ _helixResourceManager.getControllerJobZKMetadata(anotherDoneJobId,
ControllerJobTypes.TABLE_REBALANCE));
// Verify that the first DONE job is cleaned up
- assertNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId,
ControllerJobType.TABLE_REBALANCE));
+ assertNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId,
ControllerJobTypes.TABLE_REBALANCE));
// Verify that the in-progress job is still there even though it has the
oldest submission time
-
assertNotNull(_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobType.TABLE_REBALANCE));
+ assertNotNull(
+ _helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
}
private String getReloadJobIdFromResponse(String response) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java
new file mode 100644
index 0000000000..822c7d9c3e
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.controller;
+
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Interface for controller job types that store metadata in the ZK property
store.
+ */
+public interface ControllerJobType {
+
+ /**
+ * Name of the controller job type, which is used in the ZK property store
path for storing job metadata for jobs
+ * of this type.
+ */
+ String name();
+
+ /**
+ * Gets the maximum number of job metadata entries that can be stored in ZK
for this job type.
+ */
+ default Integer getZkNumJobsLimit() {
+ return CommonConstants.ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK;
+ }
+
+ /**
+ * Checks if the job metadata entry can be safely deleted. Note that the job
metadata entry will only be attempted
+ * to be deleted when the number of entries in the job metadata map exceeds
the configured limit for the job type.
+ *
+ * @param jobMetadataEntry The job metadata entry to check - a pair of job
ID and job metadata map
+ * @return true if the job metadata entry can be safely deleted, false
otherwise
+ */
+ default boolean canDelete(Pair<String, Map<String, String>>
jobMetadataEntry) {
+ return true;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 14a81ba0a1..611f301f67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1307,6 +1307,8 @@ public class CommonConstants {
public static final String SUBMISSION_TIME_MS = "submissionTimeMs";
public static final String MESSAGE_COUNT = "messageCount";
+ public static final Integer DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK = 100;
+
/**
* Segment reload job ZK props
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]