This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6639f89842 Enable ZK-based progress tracking for SegmentRelocator
rebalances (#16008)
6639f89842 is described below
commit 6639f89842f082cabf7a72cf4717166b61384402
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Jun 11 17:47:28 2025 +0100
Enable ZK-based progress tracking for SegmentRelocator rebalances (#16008)
---
.../api/resources/PinotTableRestletResource.java | 7 ++-
.../helix/core/rebalance/RebalanceChecker.java | 68 ++++++++++++++++++++-
.../core/rebalance/TableRebalanceContext.java | 39 ++++++++++--
.../core/rebalance/TableRebalanceManager.java | 20 +++++--
.../rebalance/tenant/DefaultTenantRebalancer.java | 5 +-
.../helix/core/relocation/SegmentRelocator.java | 5 +-
.../helix/core/rebalance/RebalanceCheckerTest.java | 69 ++++++++++++++++++----
.../apache/pinot/tools/PinotTableRebalancer.java | 2 +-
8 files changed, 184 insertions(+), 31 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index bdb82ce766..7ca3473ffe 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -696,18 +696,19 @@ public class PinotTableRestletResource {
if (dryRun || preChecks || downtime) {
// For dry-run, preChecks or rebalance with downtime, it's fine to run
the rebalance synchronously since it
// should be a really short operation.
- return _tableRebalanceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
+ return _tableRebalanceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false, false);
} else {
// Make a dry-run first to get the target assignment
rebalanceConfig.setDryRun(true);
RebalanceResult dryRunResult =
- _tableRebalanceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
+ _tableRebalanceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false, false);
if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
// If dry-run succeeded, run rebalance asynchronously
rebalanceConfig.setDryRun(false);
CompletableFuture<RebalanceResult> rebalanceResultFuture =
- _tableRebalanceManager.rebalanceTableAsync(tableNameWithType,
rebalanceConfig, rebalanceJobId, true);
+ _tableRebalanceManager.rebalanceTableAsync(tableNameWithType,
rebalanceConfig, rebalanceJobId, true,
+ true);
rebalanceResultFuture.whenComplete((rebalanceResult, throwable) -> {
if (throwable != null) {
String errorMsg = String.format("Caught exception/error while
rebalancing table: %s", tableNameWithType);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index b16c6b8197..cac0315393 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -111,6 +111,11 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
@VisibleForTesting
void retryRebalanceTable(String tableNameWithType, Map<String, Map<String,
String>> allJobMetadata)
throws Exception {
+ // We first check for rebalance jobs that are stuck - i.e., those that are
IN_PROGRESS but haven't updated their
+ // status in ZK within the heartbeat timeout. This could occur if a
controller crashes while running a rebalance job
+ // for instance. These stuck jobs are always aborted here. They will also
be retried if it's allowed as per the
+ // rebalance context.
+ //
// Skip retry for the table if rebalance job is still running or has
completed, in specific:
// 1) Skip retry if any rebalance job is actively running. Being actively
running means the job is at IN_PROGRESS
// status, and has updated its status kept in ZK within the heartbeat
timeout. It's possible that more than one
@@ -118,11 +123,23 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
// 2) Skip retry if the most recently started rebalance job has completed
with DONE or NO_OP. It's possible that
// jobs started earlier may be still running, but they are ignored here.
//
- // Otherwise, we can get a list of failed rebalance jobs, i.e. those at
FAILED status; or IN_PROGRESS status but
- // haven't updated their status kept in ZK within the heartbeat timeout.
For those candidate jobs to retry:
+ // Note that it should be very unlikely to have scenarios where there are
more than one rebalance jobs running
+ // for a table at the same time, or to have a rebalance job that started
earlier but completed later than the one
+ // started most recently since we try to prevent new rebalances from being
triggered while a rebalance is in
+ // progress by checking ZK metadata. Such scenarios can only occur if
multiple rebalance jobs are triggered at the
+ // same time and the second one is started before the first one updates
its status in ZK.
+ //
+ // If we detect that a retry is required based on the above criteria, we
can get a list of failed rebalance jobs,
+ // i.e. those at FAILED status; or IN_PROGRESS status but haven't updated
their status kept in ZK within the
+ // heartbeat timeout. For those candidate jobs to retry:
// 1) Firstly, group them by the original jobIds they retry for so that we
can skip those exceeded maxRetry.
// 2) For the remaining jobs, we take the one started most recently and
retry it with its original configs.
// 3) If configured, we can abort the other rebalance jobs for the table
by setting their status to FAILED.
+
+ if (hasStuckInProgressJobs(tableNameWithType, allJobMetadata)) {
+ abortExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+ }
+
Map<String/*original jobId*/, Set<Pair<TableRebalanceContext/*job
attempts*/, Long
/*startTime*/>>> candidateJobs = getCandidateJobs(tableNameWithType,
allJobMetadata);
if (candidateJobs.isEmpty()) {
@@ -147,7 +164,7 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
retryDelayMs);
return;
}
- abortExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+
// Get tableConfig only when the table needs to retry rebalance, and get
it before submitting rebalance to another
// thread, in order to avoid unnecessary ZK reads and making too many ZK
reads in a short time.
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
@@ -250,6 +267,47 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
return candidateJobRun;
}
+ /**
+ * Check if there are any rebalance jobs that are stuck in IN_PROGRESS
status, i.e., they have not updated their
+ * status in ZK within the configured heartbeat timeout.
+ * @param tableNameWithType the table name with type
+ * @param allJobMetadata the metadata of all rebalance jobs for the table
+ * @return true if there are stuck rebalance jobs, false otherwise
+ */
+ @VisibleForTesting
+ static boolean hasStuckInProgressJobs(String tableNameWithType, Map<String,
Map<String, String>> allJobMetadata)
+ throws Exception {
+ for (Map.Entry<String, Map<String, String>> entry :
allJobMetadata.entrySet()) {
+ String jobId = entry.getKey();
+ Map<String, String> jobMetadata = entry.getValue();
+ long statsUpdatedAt =
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ if (StringUtils.isEmpty(jobStatsInStr)) {
+ // Skip rebalance job as it has no job progress stats
+ continue;
+ }
+ String jobCtxInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+ if (StringUtils.isEmpty(jobCtxInStr)) {
+ // Skip rebalance job: {} as it has no job context
+ continue;
+ }
+ TableRebalanceProgressStats jobStats =
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+ TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr,
TableRebalanceContext.class);
+
+ if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) {
+ long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs();
+ if (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs)
{
+ LOGGER.info("Found stuck rebalance job: {} for table: {} that has
not updated its status in ZK within "
+ + "heartbeat timeout: {}", jobId, tableNameWithType,
heartbeatTimeoutMs);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+
@VisibleForTesting
static Map<String, Set<Pair<TableRebalanceContext, Long>>>
getCandidateJobs(String tableNameWithType,
Map<String, Map<String, String>> allJobMetadata)
@@ -279,6 +337,10 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
}
TableRebalanceProgressStats jobStats =
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr,
TableRebalanceContext.class);
+ if (!jobCtx.getAllowRetries()) {
+ LOGGER.info("Skip rebalance job: {} as it does not allow retries",
jobId);
+ continue;
+ }
long jobStartTimeMs = jobStats.getStartTimeMs();
if (latestStartedJob == null || latestStartedJob.getRight() <
jobStartTimeMs) {
latestStartedJob = Pair.of(jobId, jobStartTimeMs);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
index 48e43558cc..aea3a2ff0d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
@@ -27,24 +27,45 @@ public class TableRebalanceContext {
private String _originalJobId;
private RebalanceConfig _config;
private int _attemptId;
+ // Default to true for all user initiated rebalances, so that they can be
retried if they fail or get stuck.
+ private boolean _allowRetries = true;
- public static TableRebalanceContext forInitialAttempt(String originalJobId,
RebalanceConfig config) {
- return new TableRebalanceContext(originalJobId, config,
INITIAL_ATTEMPT_ID);
+ /**
+ * Creates a new TableRebalanceContext for the initial attempt of a
rebalance job.
+ *
+ * @param originalJobId The original job ID for the rebalance job.
+ * @param config The rebalance configuration.
+ * @param allowRetries Whether retries are allowed for this rebalance job.
This isn't part of {@link RebalanceConfig}
+ * because user initiated rebalances should always
support retries for failed and stuck jobs.
+ * @return A new TableRebalanceContext instance.
+ */
+ public static TableRebalanceContext forInitialAttempt(String originalJobId,
RebalanceConfig config,
+ boolean allowRetries) {
+ return new TableRebalanceContext(originalJobId, config,
INITIAL_ATTEMPT_ID, allowRetries);
}
+ /**
+ * Creates a new TableRebalanceContext for a retry attempt of a rebalance
job.
+ *
+ * @param originalJobId The original job ID for the rebalance job.
+ * @param config The rebalance configuration.
+ * @param attemptId The attempt ID for the retry.
+ * @return A new TableRebalanceContext instance.
+ */
public static TableRebalanceContext forRetry(String originalJobId,
RebalanceConfig config, int attemptId) {
- return new TableRebalanceContext(originalJobId, config, attemptId);
+ return new TableRebalanceContext(originalJobId, config, attemptId, true);
}
public TableRebalanceContext() {
// For JSON deserialization.
}
- private TableRebalanceContext(String originalJobId, RebalanceConfig config,
int attemptId) {
+ private TableRebalanceContext(String originalJobId, RebalanceConfig config,
int attemptId, boolean allowRetries) {
_jobId = createAttemptJobId(originalJobId, attemptId);
_originalJobId = originalJobId;
_config = config;
_attemptId = attemptId;
+ _allowRetries = allowRetries;
}
public int getAttemptId() {
@@ -79,10 +100,18 @@ public class TableRebalanceContext {
_config = config;
}
+ public boolean getAllowRetries() {
+ return _allowRetries;
+ }
+
+ public void setAllowRetries(boolean allowRetries) {
+ _allowRetries = allowRetries;
+ }
+
@Override
public String toString() {
return "TableRebalanceContext{" + "_jobId='" + _jobId + '\'' + ",
_originalJobId='" + _originalJobId + '\''
- + ", _config=" + _config + ", _attemptId=" + _attemptId + '}';
+ + ", _config=" + _config + ", _attemptId=" + _attemptId + ",
_allowRetries=" + _allowRetries + "}";
}
private static String createAttemptJobId(String originalJobId, int
attemptId) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
index 87de12b3d7..b700582593 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
@@ -79,12 +79,19 @@ public class TableRebalanceManager {
* @param rebalanceConfig configuration for the rebalance operation
* @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
* @param trackRebalanceProgress whether to track rebalance progress stats
in ZK
+ * @param allowRetries whether to allow retries for failed or stuck
rebalance operations (through
+ * {@link RebalanceChecker}). Requires {@code
trackRebalanceProgress} to be true.
* @return result of the rebalance operation
* @throws TableNotFoundException if the table does not exist
+ * @throws RebalanceInProgressException if a rebalance job is already in
progress for the table (as per ZK metadata)
*/
public RebalanceResult rebalanceTable(String tableNameWithType,
RebalanceConfig rebalanceConfig,
- String rebalanceJobId, boolean trackRebalanceProgress)
+ String rebalanceJobId, boolean trackRebalanceProgress, boolean
allowRetries)
throws TableNotFoundException, RebalanceInProgressException {
+ if (allowRetries && !trackRebalanceProgress) {
+ throw new IllegalArgumentException(
+ "Rebalance retries are only supported when rebalance progress is
tracked in ZK");
+ }
TableConfig tableConfig =
_resourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
@@ -93,7 +100,7 @@ public class TableRebalanceManager {
ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
if (trackRebalanceProgress) {
zkBasedTableRebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
- TableRebalanceContext.forInitialAttempt(rebalanceJobId,
rebalanceConfig),
+ TableRebalanceContext.forInitialAttempt(rebalanceJobId,
rebalanceConfig, allowRetries),
_resourceManager.getPropertyStore());
}
return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId,
rebalanceConfig,
@@ -109,11 +116,14 @@ public class TableRebalanceManager {
* @param rebalanceConfig configuration for the rebalance operation
* @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
* @param trackRebalanceProgress whether to track rebalance progress stats
in ZK
+ * @param allowRetries whether to allow retries for failed or stuck
rebalance operations (through
+ * {@link RebalanceChecker}). Requires {@code
trackRebalanceProgress} to be true.
* @return a CompletableFuture that will complete with the result of the
rebalance operation
* @throws TableNotFoundException if the table does not exist
+ * @throws RebalanceInProgressException if a rebalance job is already in
progress for the table (as per ZK metadata)
*/
public CompletableFuture<RebalanceResult> rebalanceTableAsync(String
tableNameWithType,
- RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean
trackRebalanceProgress)
+ RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean
trackRebalanceProgress, boolean allowRetries)
throws TableNotFoundException, RebalanceInProgressException {
TableConfig tableConfig =
_resourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
@@ -125,7 +135,8 @@ public class TableRebalanceManager {
return CompletableFuture.supplyAsync(
() -> {
try {
- return rebalanceTable(tableNameWithType, rebalanceConfig,
rebalanceJobId, trackRebalanceProgress);
+ return rebalanceTable(tableNameWithType, rebalanceConfig,
rebalanceJobId, trackRebalanceProgress,
+ allowRetries);
} catch (TableNotFoundException e) {
// Should not happen since we already checked for table existence
throw new RuntimeException(e);
@@ -147,6 +158,7 @@ public class TableRebalanceManager {
* @param rebalanceConfig configuration for the rebalance operation
* @param zkBasedTableRebalanceObserver observer to track rebalance progress
in ZK
* @return a CompletableFuture that will complete with the result of the
rebalance operation
+ * @throws RebalanceInProgressException if a rebalance job is already in
progress for the table (as per ZK metadata)
*/
public CompletableFuture<RebalanceResult> rebalanceTableAsync(String
tableNameWithType, TableConfig tableConfig,
String rebalanceJobId, RebalanceConfig rebalanceConfig,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index 369024faa6..3f288ef5ff 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -61,7 +61,8 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
rebalanceConfig.setDryRun(true);
rebalanceResult.put(table,
- _tableRebalanceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(), false));
+ _tableRebalanceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(), false,
+ false));
} catch (TableNotFoundException | RebalanceInProgressException
exception) {
rebalanceResult.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
null, null, null, null, null));
@@ -203,7 +204,7 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
TenantRebalanceObserver observer) {
try {
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName, rebalanceJobId);
- RebalanceResult result =
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
+ RebalanceResult result =
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true,
true);
if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index b2144260c0..fd10148ce3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -232,8 +232,11 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
// We're not using the async rebalance API here because we want to run
this on a separate thread pool from the
// rebalance thread pool that is used for user initiated rebalances.
+
+ // Retries are disabled because SegmentRelocator itself is a periodic
controller task, so we don't want the
+ // RebalanceChecker to unnecessarily retry any such failed rebalances.
RebalanceResult rebalance =
_tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig,
- TableRebalancer.createUniqueRebalanceJobIdentifier(), false);
+ TableRebalancer.createUniqueRebalanceJobIdentifier(), true, false);
switch (rebalance.getStatus()) {
case NO_OP:
LOGGER.info("All segments are already relocated for table: {}",
tableNameWithType);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
index d88b1319c1..373815c969 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -51,10 +51,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
public class RebalanceCheckerTest {
@@ -87,7 +84,7 @@ public class RebalanceCheckerTest {
TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.FAILED);
stats.setStartTimeMs(1000);
- TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg, true);
Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
allJobMetadata.put("job1", jobMetadata);
// 3 failed retry runs for job1
@@ -104,7 +101,7 @@ public class RebalanceCheckerTest {
stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.FAILED);
stats.setStartTimeMs(2000);
- jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg, true);
jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job2", stats, jobCtx);
allJobMetadata.put("job2", jobMetadata);
jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100,
RebalanceResult.Status.DONE);
@@ -116,7 +113,7 @@ public class RebalanceCheckerTest {
stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
stats.setStartTimeMs(3000);
- jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg, true);
jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job3", stats, jobCtx);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
allJobMetadata.put("job3", jobMetadata);
@@ -152,11 +149,37 @@ public class RebalanceCheckerTest {
stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.DONE);
stats.setStartTimeMs(5000);
- jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg, true);
jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job5", stats, jobCtx);
allJobMetadata.put("job5", jobMetadata);
jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
assertEquals(jobs.size(), 0);
+
+ // Add job6 that doesn't support retries as per its rebalance context
(used by system initiated rebalances in
+ // practice).
+ jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(5000);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job6", jobCfg, false);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job6", stats, jobCtx);
+ allJobMetadata.put("job6", jobMetadata);
+ jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+ assertEquals(jobs.size(), 0);
+
+ // Ensure that a job serialized using an older version of
TableRebalanceContext without the allowRetries field is
+ // retriable by default.
+ jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+
"{\"jobId\":\"job6\",\"attemptId\":1,\"config\":{\"maxAttempts\":4,\"dryRun\":false,\"preChecks\":false,"
+ +
"\"bootstrap\":false,\"downtime\":false,\"lowDiskMode\":false,\"bestEfforts\":false,"
+ +
"\"reassignInstances\":false,\"includeConsuming\":false,\"batchSizePerServer\":-1,"
+ +
"\"updateTargetTier\":false,\"externalViewCheckIntervalInMs\":1000,\"minAvailableReplicas\":1,"
+ +
"\"heartbeatIntervalInMs\":300000,\"heartbeatTimeoutInMs\":3600000,\"retryInitialDelayInMs\":300000,"
+ +
"\"minimizeDataMovement\":\"ENABLE\",\"externalViewStabilizationTimeoutInMs\":3600000},"
+ + "\"originalJobId\":\"job6\"}, tableName=table01}");
+ jobs = RebalanceChecker.getCandidateJobs(tableName, Map.of("job6",
jobMetadata));
+ assertEquals(jobs.size(), 1);
}
@Test
@@ -192,6 +215,28 @@ public class RebalanceCheckerTest {
assertNull(jobTime);
}
+ @Test
+ public void testStuckInProgressJobs()
+ throws Exception {
+ String tableName = "table01";
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+
+ assertFalse(RebalanceChecker.hasStuckInProgressJobs(tableName,
allJobMetadata));
+
+ RebalanceConfig jobCfg = new RebalanceConfig();
+ jobCfg.setHeartbeatTimeoutInMs(10_000);
+ TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+ // Even though allowRetries is false, we still abort stuck jobs (heartbeat
timeout exceeded).
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg, false);
+ Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+ String.valueOf(System.currentTimeMillis() - 20_000));
+ allJobMetadata.put("job1", jobMetadata);
+
+ assertTrue(RebalanceChecker.hasStuckInProgressJobs(tableName,
allJobMetadata));
+ }
+
@Test
public void testRetryRebalance()
throws Exception {
@@ -208,7 +253,7 @@ public class RebalanceCheckerTest {
TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.FAILED);
stats.setStartTimeMs(1000);
- TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg, true);
Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
allJobMetadata.put("job1", jobMetadata);
// 3 failed retry runs for job1
@@ -225,7 +270,7 @@ public class RebalanceCheckerTest {
stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.FAILED);
stats.setStartTimeMs(2000);
- jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg, true);
jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job2", stats, jobCtx);
allJobMetadata.put("job2", jobMetadata);
jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100,
RebalanceResult.Status.DONE);
@@ -237,7 +282,7 @@ public class RebalanceCheckerTest {
stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
stats.setStartTimeMs(3000);
- jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg, true);
jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job3", stats, jobCtx);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
allJobMetadata.put("job3", jobMetadata);
@@ -284,7 +329,7 @@ public class RebalanceCheckerTest {
TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
stats.setStatus(RebalanceResult.Status.FAILED);
stats.setStartTimeMs(nowMs);
- TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg, true);
Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
allJobMetadata.put("job1", jobMetadata);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index f935a22ba7..54778de2fe 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -71,7 +71,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
}
ZkBasedTableRebalanceObserver rebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, jobId,
- TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig),
_propertyStore);
+ TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig,
true), _propertyStore);
return new TableRebalancer(_helixManager, rebalanceObserver, null, null,
null)
.rebalance(tableConfig, _rebalanceConfig, jobId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]