keith-turner commented on code in PR #5416:
URL: https://github.com/apache/accumulo/pull/5416#discussion_r2017750656
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -568,10 +573,13 @@ private class MigrationCleanupThread implements Runnable {
@Override
public void run() {
while (stillManager()) {
- if (!migrations.isEmpty()) {
+ // migrations are stored in the metadata tables which cannot be read
until the
+ // TabletGroupWatchers are started
+ if (watchersStarted.get() && numMigrations() > 0) {
Review Comment:
Instead of having this watcherStarted boolean, could instead move where the
cleanup thread is started to the point where you set this to true.
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -568,10 +573,13 @@ private class MigrationCleanupThread implements Runnable {
@Override
public void run() {
while (stillManager()) {
- if (!migrations.isEmpty()) {
+ // migrations are stored in the metadata tables which cannot be read
until the
+ // TabletGroupWatchers are started
+ if (watchersStarted.get() && numMigrations() > 0) {
try {
cleanupOfflineMigrations();
Review Comment:
It would probably be more efficient to collapse this function into
cleanupDeletedMigrations() and scan the metadata table once.
```java
for (var tabletMetadata : tabletsMetadata) {
if(tabletMetadata.getMigration() != null &&
(!onlineTabletServers().contains(tabletMetadata.getMigration()) || OFFLINE =
getTableState(tm.getExtent()))) {
tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation().requireMigration(tabletMetadata.getMigration()).deleteMigration().submit(tm->false);
}
}
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -984,23 +1031,23 @@ private long balanceTablets() {
continue;
}
migrationsOutForLevel++;
- migrations.put(ke,
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
+ getContext().getAmple().mutateTablet(ke)
Review Comment:
This will do a single write RPC and wait for it. Would be better to to
create a tablet mutor for once and use it for the entire loop. This will allow
much more efficient steaming of data to the write ahead logs on the tablet
servers, instead of making a single RPC that does a single write ahead log
write on the tablet server and waiting for it.
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -586,14 +594,9 @@ public void run() {
* the metadata table and remove any migrating tablets that no longer
exist.
*/
private void cleanupNonexistentMigrations(final ClientContext
clientContext) {
+ Map<DataLevel,Set<KeyExtent>> notSeen = partitionMigrations();
Review Comment:
This entire function can probably be removed. The set of migrations in
memory could get out of sync with the metadata table. With migrations stored in
the metadata table this should not happen if the following conditions are met.
1. Migrations are added using a conditional mutation that checks tablet
exists
2. Splits and merge operation on tablet remove migrations.
3. Deleting a table delete all of its metadata including migrations. This
should happen w/o any changes in this PR.
These conditions are being met by the changes in this PR AFAICT.
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -629,6 +635,30 @@ private void cleanupOfflineMigrations() {
}
}
}
+
+ /**
+ * Remove any migrations to any of the deleted TServers
+ */
+ private void cleanupDeletedMigrations() {
+ synchronized (deletedTServers) {
+ var iter = deletedTServers.iterator();
+ var ample = getContext().getAmple();
+ while (iter.hasNext()) {
+ var deletedTServer = iter.next();
+ for (DataLevel dl : DataLevel.values()) {
+ // prev row needed for the extent
+ try (var tabletsMetadata = ample.readTablets().forLevel(dl)
+ .fetch(TabletMetadata.ColumnType.PREV_ROW,
TabletMetadata.ColumnType.MIGRATION)
+ .build()) {
+ for (var tabletMetadata : tabletsMetadata) {
+ conditionallyDeleteMigration(tabletMetadata.getExtent(),
deletedTServer);
+ }
+ }
+ }
+ iter.remove();
+ }
+ }
+ }
Review Comment:
Tracking these deleted tservers is probably not needed, can use the set of
online tablet servers instead. Also would be better to create a single tablet
mutator and use it for all tablets instead of creating one per tablet. This
allows streaming data from the metadata table to tablet server write ahead logs.
```suggestion
var ample = getContext().getAmple();
for (DataLevel dl : DataLevel.values()) {
// prev row needed for the extent
try (var tabletsMetadata = ample.readTablets().forLevel(dl)
.fetch(TabletMetadata.ColumnType.PREV_ROW,
TabletMetadata.ColumnType.MIGRATION)
.build(); var tabletsMutator =
ample.conditionallyMutateTablets(result->{})) {
for (var tabletMetadata : tabletsMetadata) {
if(tabletMetadata.getMigration() != null &&
!onlineTabletServers().contains(tabletMetadata.getMigration())) {
tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation().requireMigration(tabletMetadata.getMigration()).deleteMigration().submit(tm->false);
}
}
}
}
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -984,23 +1031,23 @@ private long balanceTablets() {
continue;
}
migrationsOutForLevel++;
- migrations.put(ke,
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
+ getContext().getAmple().mutateTablet(ke)
+
.putMigration(TabletServerIdImpl.toThrift(m.getNewTabletServer())).mutate();
log.debug("migration {}", m);
}
totalMigrationsOut += migrationsOutForLevel;
// increment this at end of loop to signal complete run w/o any
continue
levelsCompleted++;
}
- balancerMetrics.assignMigratingCount(migrations::size);
+ balancerMetrics.assignMigratingCount(Manager.this::numMigrations);
Review Comment:
This will scan the metadata table again. It was scanned earlier, wonder if
we can use the map and a count of new migrations we added here to get the new
count. Not sure if that will work, may be something to look into.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]