This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 97c11b7217 Always balance root and metadata tables (#4709)
97c11b7217 is described below
commit 97c11b721710545cbc52e97c32b23f2b4caf1048
Author: Dave Marion <[email protected]>
AuthorDate: Fri Jul 5 15:50:01 2024 -0400
Always balance root and metadata tables (#4709)
Fixes #4515
---
.../core/metadata/schema/DataLevelOrderTest.java | 40 +++++++++++++++
.../java/org/apache/accumulo/manager/Manager.java | 60 ++++++++++++++++------
2 files changed, 84 insertions(+), 16 deletions(-)
diff --git
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/DataLevelOrderTest.java
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/DataLevelOrderTest.java
new file mode 100644
index 0000000000..33dea77918
--- /dev/null
+++
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/DataLevelOrderTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.junit.jupiter.api.Test;
+
+public class DataLevelOrderTest {
+
+ @Test
+ public void testDataLevelOrder() {
+ // Code may depend on the order of the values returned
+ // for DataLevel. This test checks that the order does
+ // not change in future versions.
+ DataLevel[] levels = DataLevel.values();
+ assertEquals(3, levels.length);
+ assertEquals(DataLevel.ROOT, levels[0]);
+ assertEquals(DataLevel.METADATA, levels[1]);
+ assertEquals(DataLevel.USER, levels[2]);
+ }
+
+}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 92c3453e45..406178d1de 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -912,8 +912,6 @@ public class Manager extends AbstractServer
if (!badServers.isEmpty()) {
log.debug("not balancing because the balance information is
out-of-date {}",
badServers.keySet());
- } else if (notHosted() > 0) {
- log.debug("not balancing because there are unhosted tablets: {}",
notHosted());
} else if (getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
log.debug("not balancing because the manager is attempting to stop
cleanly");
} else if (!serversToShutdown.isEmpty()) {
@@ -959,27 +957,57 @@ public class Manager extends AbstractServer
}
private long balanceTablets() {
- BalanceParamsImpl params =
BalanceParamsImpl.fromThrift(tserverStatusForBalancer,
- tserverStatus, migrationsSnapshot());
- long wait = tabletBalancer.balance(params);
-
- for (TabletMigration m :
checkMigrationSanity(tserverStatusForBalancer.keySet(),
- params.migrationsOut())) {
- KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
- if (migrations.containsKey(ke)) {
- log.warn("balancer requested migration more than once, skipping {}",
m);
+
+ Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+ new HashMap<>(DataLevel.values().length);
+ migrationsSnapshot().forEach(ke -> {
+ partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f ->
new HashSet<>())
+ .add(ke);
+ });
+
+ final int tabletsNotHosted = notHosted();
+ BalanceParamsImpl params = null;
+ long wait = 0;
+ long totalMigrationsOut = 0;
+ for (DataLevel dl : DataLevel.values()) {
+ final Set<KeyExtent> migrationsForLevel =
partitionedMigrations.get(dl);
+ if (migrationsForLevel == null) {
+ continue;
+ }
+ if (dl == DataLevel.USER && tabletsNotHosted > 0) {
+ log.debug("not balancing user tablets because there are {} unhosted
tablets",
+ tabletsNotHosted);
continue;
}
- TServerInstance tserverInstance =
TabletServerIdImpl.toThrift(m.getNewTabletServer());
- migrations.put(ke, tserverInstance);
- log.debug("migration {}", m);
+ long migrationsOutForLevel = 0;
+ int i = 0;
+ do {
+ i++;
+ log.debug("Balancing for tables at level {}, times-in-loop: {}", dl,
i);
+ params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer,
tserverStatus,
+ migrationsForLevel);
+ wait = Math.max(tabletBalancer.balance(params), wait);
+ migrationsOutForLevel = params.migrationsOut().size();
+ for (TabletMigration m :
checkMigrationSanity(tserverStatusForBalancer.keySet(),
+ params.migrationsOut())) {
+ final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
+ if (migrations.containsKey(ke)) {
+ log.warn("balancer requested migration more than once, skipping
{}", m);
+ continue;
+ }
+ migrations.put(ke,
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
+ log.debug("migration {}", m);
+ }
+ } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl ==
DataLevel.METADATA));
+ totalMigrationsOut += migrationsOutForLevel;
}
- if (params.migrationsOut().isEmpty()) {
+
+ if (totalMigrationsOut == 0) {
synchronized (balancedNotifier) {
balancedNotifier.notifyAll();
}
} else {
- nextEvent.event("Migrating %d more tablets, %d total",
params.migrationsOut().size(),
+ nextEvent.event("Migrating %d more tablets, %d total",
totalMigrationsOut,
migrations.size());
}
return wait;