This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new b0b9555b13 Added TabletServer in WaitForBalanceIT (#4969)
b0b9555b13 is described below
commit b0b9555b1314e2707d8c36a34320c7b38361c046
Author: Dave Marion <[email protected]>
AuthorDate: Thu Oct 10 14:21:04 2024 -0400
Added TabletServer in WaitForBalanceIT (#4969)
WaitForBalanceIT was assuming that the tablets were not balanced
after adding splits. In the last step of the split code, in
DeleteOperationIds,
the EventCoordinator is notified to try and get the new splits hosted as
fast
as possible. This commit adds a new TabletServer which will cause a
balancing to occur. I also modified the isBalanced method to take into
account the tservers in the cluster, not just the location from the
metadata.
---
.../org/apache/accumulo/test/WaitForBalanceIT.java | 33 ++++++++++++++--------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
index fe1b7891c3..2c08441f5a 100644
--- a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
@@ -18,7 +18,7 @@
*/
package org.apache.accumulo.test;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.AccumuloTable;
@@ -40,6 +41,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.io.Text;
@@ -57,6 +59,7 @@ public class WaitForBalanceIT extends ConfigurableMacBase {
@Test
public void test() throws Exception {
try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
+ assertEquals(2,
c.instanceOperations().getServers(Type.TABLET_SERVER).size());
// ensure the metadata table is online
try (Scanner scanner =
c.createScanner(AccumuloTable.METADATA.tableName(),
Authorizations.EMPTY)) {
@@ -74,14 +77,22 @@ public class WaitForBalanceIT extends ConfigurableMacBase {
partitionKeys.add(new Text("" + i));
}
c.tableOperations().addSplits(tableName, partitionKeys);
- assertFalse(isBalanced(c));
c.instanceOperations().waitForBalance();
- Wait.waitFor(() -> isBalanced(c));
+ assertTrue(isBalanced(c));
+
+ // Add another tserver to force a rebalance
+
getCluster().getConfig().getClusterServerConfiguration().setNumDefaultTabletServers(3);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+ Wait.waitFor(() ->
c.instanceOperations().getServers(Type.TABLET_SERVER).size() == 3);
+ c.instanceOperations().waitForBalance();
+ assertTrue(isBalanced(c));
}
}
private boolean isBalanced(AccumuloClient c) throws Exception {
- final Map<String,Integer> counts = new HashMap<>();
+ final Map<String,Integer> tserverCounts = new HashMap<>();
+ c.instanceOperations().getServers(Type.TABLET_SERVER)
+ .forEach(ts -> tserverCounts.put(ts.toHostPortString(), 0));
int offline = 0;
for (String tableName : new String[] {AccumuloTable.METADATA.tableName(),
AccumuloTable.ROOT.tableName()}) {
@@ -93,17 +104,17 @@ public class WaitForBalanceIT extends ConfigurableMacBase {
for (Entry<Key,Value> entry : s) {
Key key = entry.getKey();
if (key.getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
- location = key.getColumnQualifier().toString();
+ location = entry.getValue().toString();
} else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
if (location == null) {
offline++;
} else {
- Integer count = counts.get(location);
+ Integer count = tserverCounts.get(location);
if (count == null) {
count = 0;
}
count = count + 1;
- counts.put(location, count);
+ tserverCounts.put(location, count);
}
location = null;
}
@@ -115,13 +126,13 @@ public class WaitForBalanceIT extends ConfigurableMacBase
{
return false;
}
int average = 0;
- for (Integer i : counts.values()) {
+ for (Integer i : tserverCounts.values()) {
average += i;
}
- average /= counts.size();
- System.out.println(counts);
+ average /= tserverCounts.size();
+ System.out.println(tserverCounts);
int tablesCount = c.tableOperations().list().size();
- for (Entry<String,Integer> hostCount : counts.entrySet()) {
+ for (Entry<String,Integer> hostCount : tserverCounts.entrySet()) {
if (Math.abs(average - hostCount.getValue()) > tablesCount) {
System.out.println(
"Average " + average + " count " + hostCount.getKey() + ": " +
hostCount.getValue());