somandal commented on code in PR #15891:
URL: https://github.com/apache/pinot/pull/15891#discussion_r2138303588


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager 
pinotHelixResourceManag
 
   @Override
   public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+    Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+    Set<String> tables = getTenantTables(config.getTenantName());
+    Set<String> allowTables = config.getAllowTables();
+    if (!allowTables.isEmpty()) {
+      tables.retainAll(allowTables);
+    }
+    tables.removeAll(config.getBlockTables());

Review Comment:
   in the API description, let's call out that if a table is in both lists, the 
exclude will take precedence



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager 
pinotHelixResourceManag
 
   @Override
   public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+    Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+    Set<String> tables = getTenantTables(config.getTenantName());
+    Set<String> allowTables = config.getAllowTables();
+    if (!allowTables.isEmpty()) {
+      tables.retainAll(allowTables);
+    }
+    tables.removeAll(config.getBlockTables());
+    tables.forEach(table -> {
+      try {
+        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+        rebalanceConfig.setDryRun(true);
+        dryRunResults.put(table,
+            _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(),
+                false));
+      } catch (TableNotFoundException exception) {
+        dryRunResults.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null, null, null));
+      }
+    });
+    if (config.isDryRun()) {
+      return new TenantRebalanceResult(null, dryRunResults, 
config.isVerboseResult());
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
+    ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config, 
dryRunResults);
+    // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
+    int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+    try {
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          while (true) {
+            String table = parallelQueue.poll();
+            if (table == null) {
+              break;
+            }
+            RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+            rebalanceConfig.setDryRun(false);
+            if (dryRunResults.get(table)
+                .getRebalanceSummaryResult()
+                .getSegmentInfo()
+                .getReplicationFactor()
+                .getExpectedValueAfterRebalance() == 1) {
+              rebalanceConfig.setMinAvailableReplicas(0);

Review Comment:
   what happens if someone explicitly sets `downtime=true`? I know you're doing 
this to avoid having to add some API to track progress by comparing EV-IS, but 
users might still want to set `downtime=true` themselves, right? In that case 
handling should be added to wait for it to complete before moving to the next 
table



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java:
##########
@@ -43,6 +55,9 @@
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;

Review Comment:
   yet to check the tests, but let's ensure that we test both the newer 
parameters and the older parameters in the tests to validate that nothing is 
broken.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager 
pinotHelixResourceManag
 
   @Override
   public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+    Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+    Set<String> tables = getTenantTables(config.getTenantName());
+    Set<String> allowTables = config.getAllowTables();
+    if (!allowTables.isEmpty()) {
+      tables.retainAll(allowTables);
+    }
+    tables.removeAll(config.getBlockTables());
+    tables.forEach(table -> {
+      try {
+        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+        rebalanceConfig.setDryRun(true);
+        dryRunResults.put(table,
+            _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(),
+                false));
+      } catch (TableNotFoundException exception) {
+        dryRunResults.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null, null, null));
+      }
+    });
+    if (config.isDryRun()) {
+      return new TenantRebalanceResult(null, dryRunResults, 
config.isVerboseResult());
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
+    ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config, 
dryRunResults);
+    // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
+    int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+    try {
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          while (true) {
+            String table = parallelQueue.poll();
+            if (table == null) {
+              break;
+            }
+            RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+            rebalanceConfig.setDryRun(false);
+            if (dryRunResults.get(table)
+                .getRebalanceSummaryResult()
+                .getSegmentInfo()
+                .getReplicationFactor()
+                .getExpectedValueAfterRebalance() == 1) {
+              rebalanceConfig.setMinAvailableReplicas(0);
+            }
+            rebalanceTable(table, rebalanceConfig, 
dryRunResults.get(table).getJobId(), observer);
+          }
+          observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", config.getTenantName()));
+        });
+      }
+    } catch (Exception exception) {
+      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", config.getTenantName(),
+          exception.getMessage()));
+    }
+
+    // Prepare tenant rebalance result to return
+    Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
+    for (String table : dryRunResults.keySet()) {
+      RebalanceResult result = dryRunResults.get(table);
+      if (result.getStatus() == RebalanceResult.Status.DONE) {
+        rebalanceResults.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,

Review Comment:
   Just to ensure I understand, when we get `DONE` status from dryRun, we 
assume that the actual status will be `IN_PROGRESS` as a rebalance will need to 
be run?
   
   Do we want to add any handling for failure statuses? Or is capturing the 
status in the summary good enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to