This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 9186574872 Fixes metadata balancing issues (#5358)
9186574872 is described below
commit 918657487286d2df9af0ea648bdfe77dc59ea1c2
Author: Daniel Roberts <[email protected]>
AuthorDate: Wed Feb 26 13:29:05 2025 -0500
Fixes metadata balancing issues (#5358)
* Add new metadata balance test to BalanceIT
Adds a new test for checking balancing of the metadata table.
This test currently breaks as there are outstanding bugs in 2.1
related to balancing the metadata table.
Fixes balance related filtering that was using table name instead of id
This change passes a map of tables to balance each balancer.
This reduces the possibility that a balancer will attempt
to balance tables not assigned to itself.
Removes the while loop in favor of just skipping the current datalevel
if specific conditions are met.
---------
Co-authored-by: Keith Turner <[email protected]>
Co-authored-by: Christopher Tubbs <[email protected]>
---
.../core/manager/balancer/BalanceParamsImpl.java | 22 ++++-
.../core/spi/balancer/BalancerEnvironment.java | 5 +
.../spi/balancer/HostRegexTableLoadBalancer.java | 6 +-
.../core/spi/balancer/TableLoadBalancer.java | 14 +--
.../accumulo/core/spi/balancer/TabletBalancer.java | 10 ++
.../core/spi/balancer/GroupBalancerTest.java | 9 +-
...tRegexTableLoadBalancerReconfigurationTest.java | 4 +-
.../balancer/HostRegexTableLoadBalancerTest.java | 14 +--
.../core/spi/balancer/SimpleLoadBalancerTest.java | 4 +-
.../core/spi/balancer/TableLoadBalancerTest.java | 6 +-
.../java/org/apache/accumulo/manager/Manager.java | 103 ++++++++++++---------
.../java/org/apache/accumulo/test/BalanceIT.java | 96 ++++++++++++++++++-
.../accumulo/test/ChaoticLoadBalancerTest.java | 2 +-
13 files changed, 213 insertions(+), 82 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
index 97b9315c6e..00f593d9f3 100644
---
a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
@@ -20,10 +20,12 @@ package org.apache.accumulo.core.manager.balancer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -42,38 +44,43 @@ public class BalanceParamsImpl implements
TabletBalancer.BalanceParameters {
private final SortedMap<TServerInstance,TabletServerStatus>
thriftCurrentStatus;
private final Set<KeyExtent> thriftCurrentMigrations;
private final DataLevel currentDataLevel;
+ private final Map<String,TableId> tablesToBalance;
public static BalanceParamsImpl
fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
- Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
+ Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
+ Map<String,TableId> tablesToBalance) {
Set<TabletId> currentMigrations =
thriftCurrentMigrations.stream().map(TabletIdImpl::new)
.collect(Collectors.toUnmodifiableSet());
return new BalanceParamsImpl(currentStatus, currentMigrations, new
ArrayList<>(),
- thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
+ thriftCurrentStatus, thriftCurrentMigrations, currentLevel,
tablesToBalance);
}
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus>
currentStatus,
- Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
- DataLevel currentLevel) {
+ Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
DataLevel currentLevel,
+ Map<String,TableId> tablesToBalance) {
this.currentStatus = currentStatus;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = null;
this.thriftCurrentMigrations = null;
this.currentDataLevel = currentLevel;
+ this.tablesToBalance = tablesToBalance;
}
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus>
currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
- Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
+ Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
+ Map<String,TableId> tablesToBalance) {
this.currentStatus = currentStatus;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = thriftCurrentStatus;
this.thriftCurrentMigrations = thriftCurrentMigrations;
this.currentDataLevel = currentLevel;
+ this.tablesToBalance = tablesToBalance;
}
@Override
@@ -110,4 +117,9 @@ public class BalanceParamsImpl implements
TabletBalancer.BalanceParameters {
public String currentLevel() {
return currentDataLevel.name();
}
+
+ @Override
+ public Map<String,TableId> getTablesToBalance() {
+ return tablesToBalance;
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
index 733e847fe5..88aaf60d9a 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends
ServiceEnvironment {
* Many Accumulo plugins are given table IDs as this is what Accumulo uses
internally to identify
* tables. This provides a mapping of table names to table IDs for the
purposes of translating
* and/or enumerating the existing tables.
+ *
+ * <p>
+ * This returns all tables that exists in the system. Each request to
balance should limit itself
+ * to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not
balance everything
+ * returned by this.
*/
Map<String,TableId> getTableIdMap();
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index cb88ce320c..f6b31af244 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@ -380,7 +380,7 @@ public class HostRegexTableLoadBalancer extends
TableLoadBalancer {
public long balance(BalanceParameters params) {
long minBalanceTime = 20_000;
// Iterate over the tables and balance each of them
- Map<String,TableId> tableIdMap = environment.getTableIdMap();
+ Map<String,TableId> tableIdMap = params.getTablesToBalance();
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -511,8 +511,8 @@ public class HostRegexTableLoadBalancer extends
TableLoadBalancer {
continue;
}
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
- getBalancerForTable(tableId).balance(
- new BalanceParamsImpl(currentView, migrations, newMigrations,
DataLevel.of(tableId)));
+ getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView,
migrations,
+ newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
if (newMigrations.isEmpty()) {
tableToTimeSinceNoMigrations.remove(tableId);
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
index 55a24c3094..84c9074b46 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
@@ -98,7 +98,8 @@ public class TableLoadBalancer implements TabletBalancer {
}
if (balancer == null) {
- log.info("Using balancer {} for table {}",
SimpleLoadBalancer.class.getName(), tableId);
+ log.info("Creating balancer {} limited to balancing table {}",
+ SimpleLoadBalancer.class.getName(), tableId);
balancer = new SimpleLoadBalancer(tableId);
}
perTableBalancers.put(tableId, balancer);
@@ -124,13 +125,14 @@ public class TableLoadBalancer implements TabletBalancer {
@Override
public long balance(BalanceParameters params) {
long minBalanceTime = 5_000;
- // Iterate over the tables and balance each of them
final DataLevel currentDataLevel =
DataLevel.valueOf(params.currentLevel());
- for (TableId tableId : environment.getTableIdMap().values()) {
+ for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet())
{
+ String tableName = entry.getKey();
+ TableId tableId = entry.getValue();
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
- long tableBalanceTime =
- getBalancerForTable(tableId).balance(new
BalanceParamsImpl(params.currentStatus(),
- params.currentMigrations(), newMigrations, currentDataLevel));
+ long tableBalanceTime = getBalancerForTable(tableId)
+ .balance(new BalanceParamsImpl(params.currentStatus(),
params.currentMigrations(),
+ newMigrations, currentDataLevel, Map.of(tableName, tableId)));
if (tableBalanceTime < minBalanceTime) {
minBalanceTime = tableBalanceTime;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
index 06235a10a1..4731a5a844 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.SortedMap;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -102,6 +103,15 @@ public interface TabletBalancer {
* @since 2.1.4
*/
String currentLevel();
+
+ /**
+ * This is the set of tables the balancer should consider. Balancing any
tables outside of this
+ * set will be ignored and result in an error in the logs.
+ *
+ * @return map of table names to table ids that should be balanced.
+ * @since 2.1.4
+ */
+ Map<String,TableId> getTablesToBalance();
}
/**
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
index e55eb379d2..f469b8fd2a 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
@@ -108,10 +108,11 @@ public class GroupBalancerTest {
}
};
- balance(balancer, maxMigrations, tid);
+ balance(balancer, maxMigrations, tid, Map.of("1", tid));
}
- public void balance(TabletBalancer balancer, int maxMigrations, TableId
tid) {
+ public void balance(TabletBalancer balancer, int maxMigrations, TableId
tid,
+ Map<String,TableId> tablesToBalance) {
while (true) {
Set<TabletId> migrations = new HashSet<>();
@@ -123,8 +124,8 @@ public class GroupBalancerTest {
new
org.apache.accumulo.core.master.thrift.TabletServerStatus()));
}
- balancer
- .balance(new BalanceParamsImpl(current, migrations, migrationsOut,
DataLevel.of(tid)));
+ balancer.balance(new BalanceParamsImpl(current, migrations,
migrationsOut,
+ DataLevel.of(tid), tablesToBalance));
assertTrue(migrationsOut.size() <= (maxMigrations + 5),
"Max Migration exceeded " + maxMigrations + " " +
migrationsOut.size());
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
index 58a89ec626..5a201f15da 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
@@ -108,7 +108,7 @@ public class HostRegexTableLoadBalancerReconfigurationTest
// getOnlineTabletsForTable
UtilWaitThread.sleep(3000);
this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER, tables));
assertEquals(0, migrationsOut.size());
// Change property, simulate call by TableConfWatcher
@@ -120,7 +120,7 @@ public class HostRegexTableLoadBalancerReconfigurationTest
// populate the map with an older time value
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER, tables));
assertEquals(5, migrationsOut.size());
for (TabletMigration migration : migrationsOut) {
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
index 4d3162e02d..b508e0fb3a 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
@@ -98,7 +98,7 @@ public class HostRegexTableLoadBalancerTest extends
BaseHostRegexTableLoadBalanc
List<TabletMigration> migrationsOut = new ArrayList<>();
long wait =
this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER,
environment.getTableIdMap()));
assertEquals(20000, wait);
// should balance four tablets in one of the tables before reaching max
assertEquals(4, migrationsOut.size());
@@ -109,7 +109,7 @@ public class HostRegexTableLoadBalancerTest extends
BaseHostRegexTableLoadBalanc
}
migrationsOut.clear();
wait = this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER,
environment.getTableIdMap()));
assertEquals(20000, wait);
// should balance four tablets in one of the other tables before reaching
max
assertEquals(4, migrationsOut.size());
@@ -120,7 +120,7 @@ public class HostRegexTableLoadBalancerTest extends
BaseHostRegexTableLoadBalanc
}
migrationsOut.clear();
wait = this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER,
environment.getTableIdMap()));
assertEquals(20000, wait);
// should balance four tablets in one of the other tables before reaching
max
assertEquals(4, migrationsOut.size());
@@ -131,7 +131,7 @@ public class HostRegexTableLoadBalancerTest extends
BaseHostRegexTableLoadBalanc
}
migrationsOut.clear();
wait = this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER,
environment.getTableIdMap()));
assertEquals(20000, wait);
// no more balancing to do
assertEquals(0, migrationsOut.size());
@@ -148,7 +148,7 @@ public class HostRegexTableLoadBalancerTest extends
BaseHostRegexTableLoadBalanc
migrations.addAll(tableTablets.get(BAR.getTableName()));
long wait =
this.balance(new
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
- migrations, migrationsOut, DataLevel.USER));
+ migrations, migrationsOut, DataLevel.USER,
environment.getTableIdMap()));
assertEquals(20000, wait);
// no migrations should have occurred as 10 is the maxOutstandingMigrations
assertEquals(0, migrationsOut.size());
@@ -495,8 +495,8 @@ public class HostRegexTableLoadBalancerTest extends
BaseHostRegexTableLoadBalanc
init(DEFAULT_TABLE_PROPERTIES);
Set<TabletId> migrations = new HashSet<>();
List<TabletMigration> migrationsOut = new ArrayList<>();
- this.balance(
- new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut,
DataLevel.USER));
+ this.balance(new BalanceParamsImpl(createCurrent(15), migrations,
migrationsOut, DataLevel.USER,
+ environment.getTableIdMap()));
assertEquals(2, migrationsOut.size());
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
index 055898928b..41193380ac 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
@@ -204,7 +204,7 @@ public class SimpleLoadBalancerTest {
while (true) {
List<TabletMigration> migrationsOut = new ArrayList<>();
balancer.balance(new BalanceParamsImpl(getAssignments(servers),
migrations, migrationsOut,
- DataLevel.USER));
+ DataLevel.USER, Map.of()));
if (migrationsOut.isEmpty()) {
break;
}
@@ -247,7 +247,7 @@ public class SimpleLoadBalancerTest {
while (true) {
List<TabletMigration> migrationsOut = new ArrayList<>();
balancer.balance(new BalanceParamsImpl(getAssignments(servers),
migrations, migrationsOut,
- DataLevel.USER));
+ DataLevel.USER, Map.of()));
if (migrationsOut.isEmpty()) {
break;
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
index 8e9aefd028..6045f417a8 100644
---
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
@@ -142,13 +142,15 @@ public class TableLoadBalancerTest {
List<TabletMigration> migrationsOut = new ArrayList<>();
TableLoadBalancer tls = new TableLoadBalancer();
tls.init(environment);
- tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut,
DataLevel.USER));
+ tls.balance(
+ new BalanceParamsImpl(state, migrations, migrationsOut,
DataLevel.USER, tableIdMap));
assertEquals(0, migrationsOut.size());
state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
tls = new TableLoadBalancer();
tls.init(environment);
- tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut,
DataLevel.USER));
+ tls.balance(
+ new BalanceParamsImpl(state, migrations, migrationsOut,
DataLevel.USER, tableIdMap));
int count = 0;
Map<TableId,Integer> movedByTable = new HashMap<>();
movedByTable.put(TableId.of(t1Id), 0);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index aa47e713e4..67a6656164 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -99,7 +99,7 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.process.thrift.ServerProcessService;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
-import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
+import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -999,20 +999,22 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
final Map<String,TableInfo> newTableMap =
new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
if (dl == DataLevel.ROOT) {
- if (oldTableMap.containsKey(RootTable.NAME)) {
- newTableMap.put(RootTable.NAME, oldTableMap.get(RootTable.NAME));
+ if (oldTableMap.containsKey(RootTable.ID.canonical())) {
+ newTableMap.put(RootTable.ID.canonical(),
oldTableMap.get(RootTable.ID.canonical()));
}
} else if (dl == DataLevel.METADATA) {
- if (oldTableMap.containsKey(MetadataTable.NAME)) {
- newTableMap.put(MetadataTable.NAME,
oldTableMap.get(MetadataTable.NAME));
+ if (oldTableMap.containsKey(MetadataTable.ID.canonical())) {
+ newTableMap.put(MetadataTable.ID.canonical(),
+ oldTableMap.get(MetadataTable.ID.canonical()));
}
} else if (dl == DataLevel.USER) {
- if (!oldTableMap.containsKey(MetadataTable.NAME)
- && !oldTableMap.containsKey(RootTable.NAME)) {
+ if (!oldTableMap.containsKey(MetadataTable.ID.canonical())
+ && !oldTableMap.containsKey(RootTable.ID.canonical())) {
newTableMap.putAll(oldTableMap);
} else {
oldTableMap.forEach((table, info) -> {
- if (!table.equals(RootTable.NAME) &&
!table.equals(MetadataTable.NAME)) {
+ if (!table.equals(RootTable.ID.canonical())
+ && !table.equals(MetadataTable.ID.canonical())) {
newTableMap.put(table, info);
}
});
@@ -1026,6 +1028,23 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
return tserverStatusForLevel;
}
+ private Map<String,TableId> getTablesForLevel(DataLevel dataLevel) {
+ switch (dataLevel) {
+ case ROOT:
+ return Map.of(RootTable.NAME, RootTable.ID);
+ case METADATA:
+ return Map.of(MetadataTable.NAME, MetadataTable.ID);
+ case USER: {
+ Map<String,TableId> userTables = new
HashMap<>(getContext().getTableNameToIdMap());
+ userTables.remove(RootTable.NAME);
+ userTables.remove(MetadataTable.NAME);
+ return Collections.unmodifiableMap(userTables);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown data level " +
dataLevel);
+ }
+ }
+
private long balanceTablets() {
final int tabletsNotHosted = notHosted();
@@ -1042,6 +1061,18 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
tabletsNotHosted);
continue;
}
+
+ if ((dl == DataLevel.METADATA || dl == DataLevel.USER)
+ && !partitionedMigrations.get(DataLevel.ROOT).isEmpty()) {
+ log.debug("Not balancing {} because {} has migrations", dl,
DataLevel.ROOT);
+ continue;
+ }
+
+ if (dl == DataLevel.USER &&
!partitionedMigrations.get(DataLevel.METADATA).isEmpty()) {
+ log.debug("Not balancing {} because {} has migrations", dl,
DataLevel.METADATA);
+ continue;
+ }
+
// Create a view of the tserver status such that it only contains the
tables
// for this level in the tableMap.
SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
@@ -1052,44 +1083,26 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
tserverStatusForLevel.forEach((tsi, status) ->
tserverStatusForBalancerLevel
.put(new TabletServerIdImpl(tsi),
TServerStatusImpl.fromThrift(status)));
+ log.debug("Balancing for tables at level {}", dl);
+
+ SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
+ tserverStatusForBalancerLevel;
+ params = BalanceParamsImpl.fromThrift(statusForBalancerLevel,
tserverStatusForLevel,
+ partitionedMigrations.get(dl), dl, getTablesForLevel(dl));
+ wait = Math.max(tabletBalancer.balance(params), wait);
long migrationsOutForLevel = 0;
- int attemptNum = 0;
- do {
- log.debug("Balancing for tables at level {}, times-in-loop: {}", dl,
++attemptNum);
-
- SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
- tserverStatusForBalancerLevel;
- if (attemptNum > 1 && (dl == DataLevel.ROOT || dl ==
DataLevel.METADATA)) {
- // If we are still migrating then perform a re-check on the tablet
- // servers to make sure non of them have failed.
- Set<TServerInstance> currentServers =
tserverSet.getCurrentServers();
- tserverStatus = gatherTableInformation(currentServers);
- // Create a view of the tserver status such that it only contains
the tables
- // for this level in the tableMap.
- tserverStatusForLevel = createTServerStatusView(dl, tserverStatus);
- final SortedMap<TabletServerId,TServerStatus>
tserverStatusForBalancerLevel2 =
- new TreeMap<>();
- tserverStatusForLevel.forEach((tsi, status) ->
tserverStatusForBalancerLevel2
- .put(new TabletServerIdImpl(tsi),
TServerStatusImpl.fromThrift(status)));
- statusForBalancerLevel = tserverStatusForBalancerLevel2;
+ for (TabletMigration m :
checkMigrationSanity(statusForBalancerLevel.keySet(),
+ params.migrationsOut(), dl)) {
+ final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
+ if (partitionedMigrations.get(dl).contains(ke)) {
+ log.warn("balancer requested migration more than once, skipping
{}", m);
+ continue;
}
+ migrationsOutForLevel++;
+ migrations.put(ke,
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
+ log.debug("migration {}", m);
+ }
- params = BalanceParamsImpl.fromThrift(statusForBalancerLevel,
tserverStatusForLevel,
- partitionedMigrations.get(dl), dl);
- wait = Math.max(tabletBalancer.balance(params), wait);
- migrationsOutForLevel = 0;
- for (TabletMigration m :
checkMigrationSanity(statusForBalancerLevel.keySet(),
- params.migrationsOut(), dl)) {
- final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
- if (migrations.containsKey(ke)) {
- log.warn("balancer requested migration more than once, skipping
{}", m);
- continue;
- }
- migrationsOutForLevel++;
- migrations.put(ke,
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
- log.debug("migration {}", m);
- }
- } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl ==
DataLevel.METADATA));
totalMigrationsOut += migrationsOutForLevel;
// increment this at end of loop to signal complete run w/o any
continue
@@ -1115,7 +1128,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
if (m.getTablet() == null) {
log.error("Balancer gave back a null tablet {}", m);
} else if (DataLevel.of(m.getTablet().getTable()) != level) {
- log.trace(
+ log.warn(
"Balancer wants to move a tablet ({}) outside of the current
processing level ({}), "
+ "ignoring and should be processed at the correct level
({})",
m.getTablet(), level, DataLevel.of(m.getTablet().getTable()));
@@ -1946,7 +1959,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
void initializeBalancer() {
var localTabletBalancer =
Property.createInstanceFromPropertyName(getConfiguration(),
- Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new
SimpleLoadBalancer());
+ Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new
TableLoadBalancer());
localTabletBalancer.init(balancerEnvironment);
tabletBalancer = localTabletBalancer;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
index 0164463903..a282388ce7 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
@@ -18,27 +18,40 @@
*/
package org.apache.accumulo.test;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.time.Duration;
+import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BalanceIT extends AccumuloClusterHarness {
+public class BalanceIT extends ConfigurableMacBase {
private static final Logger log = LoggerFactory.getLogger(BalanceIT.class);
@Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
Map<String,String> siteConfig = cfg.getSiteConfig();
siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms");
@@ -46,7 +59,7 @@ public class BalanceIT extends AccumuloClusterHarness {
siteConfig.put("general.custom.metrics.opts.logging.step", "0.5s");
cfg.setSiteConfig(siteConfig);
// ensure we have two tservers
- if (cfg.getNumTservers() < 2) {
+ if (cfg.getNumTservers() != 2) {
cfg.setNumTservers(2);
}
}
@@ -59,7 +72,7 @@ public class BalanceIT extends AccumuloClusterHarness {
@Test
public void testBalance() throws Exception {
String tableName = getUniqueNames(1)[0];
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
log.info("Creating table");
c.tableOperations().create(tableName);
SortedSet<Text> splits = new TreeSet<>();
@@ -72,4 +85,77 @@ public class BalanceIT extends AccumuloClusterHarness {
c.instanceOperations().waitForBalance();
}
}
+
+ @Test
+ public void testBalanceMetadata() throws Exception {
+ String tableName = getUniqueNames(1)[0];
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 0; i < 10; i++) {
+ splits.add(new Text("" + i));
+ }
+ c.tableOperations().create(tableName, new
NewTableConfiguration().withSplits(splits));
+
+ var metaSplits = IntStream.range(1, 100).mapToObj(i ->
Integer.toString(i, 36)).map(Text::new)
+ .collect(Collectors.toCollection(TreeSet::new));
+ c.tableOperations().addSplits(MetadataTable.NAME, metaSplits);
+
+ var locCounts = countLocations(c, MetadataTable.NAME);
+
+ c.instanceOperations().waitForBalance();
+
+ locCounts = countLocations(c, MetadataTable.NAME);
+ var stats = locCounts.values().stream().mapToInt(i ->
i).summaryStatistics();
+ assertTrue(stats.getMax() <= 51, locCounts.toString());
+ assertTrue(stats.getMin() >= 50, locCounts.toString());
+ assertEquals(2, stats.getCount(), locCounts.toString());
+
+ assertEquals(2, getCluster().getConfig().getNumTservers());
+ getCluster().getConfig().setNumTservers(4);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+ Wait.waitFor(() -> {
+ var lc = countLocations(c, MetadataTable.NAME);
+ log.info("locations:{}", lc);
+ return lc.size() == 4;
+ });
+
+ c.instanceOperations().waitForBalance();
+
+ locCounts = countLocations(c, MetadataTable.NAME);
+ stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics();
+ assertTrue(stats.getMax() <= 26, locCounts.toString());
+ assertTrue(stats.getMin() >= 25, locCounts.toString());
+ assertEquals(4, stats.getCount(), locCounts.toString());
+
+ // The user table should eventually balance
+ Wait.waitFor(() -> {
+ var lc = countLocations(c, tableName);
+ log.info("locations:{}", lc);
+ return lc.size() == 4;
+ });
+
+ locCounts = countLocations(c, tableName);
+ stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics();
+ assertTrue(stats.getMax() <= 3, locCounts.toString());
+ assertTrue(stats.getMin() >= 2, locCounts.toString());
+ assertEquals(4, stats.getCount(), locCounts.toString());
+ }
+ }
+
+ private Map<String,Integer> countLocations(AccumuloClient client, String
tableName)
+ throws Exception {
+ var ctx = ((ClientContext) client);
+ var ample = ctx.getAmple();
+ try (var tabletsMeta =
+
ample.readTablets().forTable(ctx.getTableId(tableName)).fetch(LOCATION,
PREV_ROW).build()) {
+ Map<String,Integer> locCounts = new HashMap<>();
+ for (var tabletMeta : tabletsMeta) {
+ var loc = tabletMeta.getLocation();
+ locCounts.merge(loc == null ? " none" : loc.toString(), 1,
Integer::sum);
+ }
+ return locCounts;
+ }
+ }
}
diff --git
a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
index 57fbd33247..5594aec10a 100644
--- a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
@@ -159,7 +159,7 @@ public class ChaoticLoadBalancerTest {
List<TabletMigration> migrationsOut = new ArrayList<>();
while (!migrationsOut.isEmpty()) {
balancer.balance(new BalanceParamsImpl(getAssignments(servers),
migrations, migrationsOut,
- DataLevel.USER));
+ DataLevel.USER, Map.of()));
}
}