somandal commented on code in PR #15891:
URL: https://github.com/apache/pinot/pull/15891#discussion_r2138303588
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager
pinotHelixResourceManag
@Override
public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+ Set<String> tables = getTenantTables(config.getTenantName());
+ Set<String> allowTables = config.getAllowTables();
+ if (!allowTables.isEmpty()) {
+ tables.retainAll(allowTables);
+ }
+ tables.removeAll(config.getBlockTables());
Review Comment:
in the API description, let's call out that if a table is in both lists, the
exclude will take precedence
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager
pinotHelixResourceManag
@Override
public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+ Set<String> tables = getTenantTables(config.getTenantName());
+ Set<String> allowTables = config.getAllowTables();
+ if (!allowTables.isEmpty()) {
+ tables.retainAll(allowTables);
+ }
+ tables.removeAll(config.getBlockTables());
+ tables.forEach(table -> {
+ try {
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(true);
+ dryRunResults.put(table,
+ _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(),
+ false));
+ } catch (TableNotFoundException exception) {
+ dryRunResults.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
+ null, null, null, null, null));
+ }
+ });
+ if (config.isDryRun()) {
+ return new TenantRebalanceResult(null, dryRunResults,
config.isVerboseResult());
+ }
+
+ String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+ TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+ tables, _pinotHelixResourceManager);
+ observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
+ ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config,
dryRunResults);
+ // ensure atleast 1 thread is created to run the sequential table
rebalance operations
+ int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+ try {
+ for (int i = 0; i < parallelism; i++) {
+ _executorService.submit(() -> {
+ while (true) {
+ String table = parallelQueue.poll();
+ if (table == null) {
+ break;
+ }
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
+ if (dryRunResults.get(table)
+ .getRebalanceSummaryResult()
+ .getSegmentInfo()
+ .getReplicationFactor()
+ .getExpectedValueAfterRebalance() == 1) {
+ rebalanceConfig.setMinAvailableReplicas(0);
Review Comment:
what happens if someone explicitly sets `downtime=true`? I know you're doing
this to avoid having to add some API to track progress by comparing EV-IS, but
users might still want to set `downtime=true` themselves, right? In that case
handling should be added to wait for it to complete before moving to the next
table
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java:
##########
@@ -43,6 +55,9 @@
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
Review Comment:
yet to check the tests, but let's ensure that we test both the newer
parameters and the older parameters in the tests to validate that nothing is
broken.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager
pinotHelixResourceManag
@Override
public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+ Set<String> tables = getTenantTables(config.getTenantName());
+ Set<String> allowTables = config.getAllowTables();
+ if (!allowTables.isEmpty()) {
+ tables.retainAll(allowTables);
+ }
+ tables.removeAll(config.getBlockTables());
+ tables.forEach(table -> {
+ try {
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(true);
+ dryRunResults.put(table,
+ _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(),
+ false));
+ } catch (TableNotFoundException exception) {
+ dryRunResults.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
+ null, null, null, null, null));
+ }
+ });
+ if (config.isDryRun()) {
+ return new TenantRebalanceResult(null, dryRunResults,
config.isVerboseResult());
+ }
+
+ String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+ TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+ tables, _pinotHelixResourceManager);
+ observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
+ ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config,
dryRunResults);
+ // ensure atleast 1 thread is created to run the sequential table
rebalance operations
+ int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+ try {
+ for (int i = 0; i < parallelism; i++) {
+ _executorService.submit(() -> {
+ while (true) {
+ String table = parallelQueue.poll();
+ if (table == null) {
+ break;
+ }
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
+ if (dryRunResults.get(table)
+ .getRebalanceSummaryResult()
+ .getSegmentInfo()
+ .getReplicationFactor()
+ .getExpectedValueAfterRebalance() == 1) {
+ rebalanceConfig.setMinAvailableReplicas(0);
+ }
+ rebalanceTable(table, rebalanceConfig,
dryRunResults.get(table).getJobId(), observer);
+ }
+ observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", config.getTenantName()));
+ });
+ }
+ } catch (Exception exception) {
+ observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", config.getTenantName(),
+ exception.getMessage()));
+ }
+
+ // Prepare tenant rebalance result to return
+ Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
+ for (String table : dryRunResults.keySet()) {
+ RebalanceResult result = dryRunResults.get(table);
+ if (result.getStatus() == RebalanceResult.Status.DONE) {
+ rebalanceResults.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
Review Comment:
Just to ensure I understand, when we get `DONE` status from dryRun, we
assume that the actual status will be `IN_PROGRESS` as a rebalance will need to
be run?
Do we want to add any handling for failure statuses? Or is capturing the
status in the summary good enough?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]