This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit c22cd0ce243f678674e3d8b4c73b2634e467f3ef
Merge: fa09c1a7f5 a89aadcaf8
Author: Daniel Roberts ddanielr <[email protected]>
AuthorDate: Mon Jun 23 18:43:14 2025 +0000

    Merge branch '2.1'

 .../apache/accumulo/manager/TabletGroupWatcher.java   | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 4481ec28fd,5604e69c70..c9ff9ba426
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -56,34 -53,43 +56,35 @@@ import org.apache.accumulo.core.data.Ra
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.gc.ReferenceFile;
  import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger;
  import org.apache.accumulo.core.logging.TabletLogger;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
  import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
  import org.apache.accumulo.core.manager.thrift.ManagerState;
 -import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 -import org.apache.accumulo.core.metadata.MetadataTable;
 -import org.apache.accumulo.core.metadata.RootTable;
 -import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
 -import 
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
  import org.apache.accumulo.core.metadata.TabletState;
  import org.apache.accumulo.core.metadata.schema.Ample;
 -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 -import org.apache.accumulo.core.metadata.schema.MetadataTime;
 +import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
  import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+ import org.apache.accumulo.core.util.Timer;
 +import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
 -import org.apache.accumulo.manager.Manager.TabletGoalState;
 -import org.apache.accumulo.manager.state.MergeStats;
 +import org.apache.accumulo.manager.metrics.ManagerMetrics;
  import org.apache.accumulo.manager.state.TableCounts;
  import org.apache.accumulo.manager.state.TableStats;
 -import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 +import org.apache.accumulo.server.conf.CheckCompactionConfig;
  import org.apache.accumulo.server.conf.TableConfiguration;
 -import org.apache.accumulo.server.gc.AllVolumesDirectory;
 +import org.apache.accumulo.server.fs.VolumeUtil;
  import org.apache.accumulo.server.log.WalStateManager;
  import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
  import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
@@@ -997,37 -1115,29 +998,46 @@@ abstract class TabletGroupWatcher exten
      }
    }
  
 -  private void flushChanges(TabletLists tLists, WalStateManager wals)
 +  private final Lock flushLock = new ReentrantLock();
 +
 +  private void flushChanges(TabletLists tLists)
        throws DistributedStoreException, TException, WalMarkerException {
      var unassigned = Collections.unmodifiableMap(tLists.unassigned);
 -
 -    handleDeadTablets(tLists, wals);
 -
 -    int beforeSize = tLists.assignments.size();
 -    Timer timer = Timer.startNew();
--
 -    getAssignmentsFromBalancer(tLists, unassigned);
 -    if (!unassigned.isEmpty()) {
 -      Manager.log.debug("[{}] requested assignments for {} tablets and got {} 
in {} ms",
 -          store.name(), unassigned.size(), tLists.assignments.size() - 
beforeSize,
 -          timer.elapsed(TimeUnit.MILLISECONDS));
++    Timer timer;
 +    flushLock.lock();
 +    try {
 +      // This method was originally only ever called by one thread. The code 
was modified so that
 +      // two threads could possibly call this flush method concurrently. It 
is not clear the
 +      // following methods are thread safe so a lock is acquired out of 
caution. Balancer plugins
 +      // may not expect multiple threads to call them concurrently, Accumulo 
has not done this in
 +      // the past. The log recovery code needs to be evaluated for thread 
safety.
 +      handleDeadTablets(tLists);
 +
++      int beforeSize = tLists.assignments.size();
++      timer = Timer.startNew();
++
 +      getAssignmentsFromBalancer(tLists, unassigned);
++      if (!unassigned.isEmpty()) {
++        Manager.log.debug("[{}] requested assignments for {} tablets and got 
{} in {} ms",
++            store.name(), unassigned.size(), tLists.assignments.size() - 
beforeSize,
++            timer.elapsed(TimeUnit.MILLISECONDS));
++      }
 +    } finally {
 +      flushLock.unlock();
      }
  
 +    Set<KeyExtent> failedFuture = Set.of();
      if (!tLists.assignments.isEmpty()) {
-       Manager.log.info(String.format("Assigning %d tablets", 
tLists.assignments.size()));
+       Manager.log.info("Assigning {} tablets", tLists.assignments.size());
 -      store.setFutureLocations(tLists.assignments);
 +      failedFuture = store.setFutureLocations(tLists.assignments);
      }
      tLists.assignments.addAll(tLists.assigned);
+     timer.restart();
      for (Assignment a : tLists.assignments) {
 +      if (failedFuture.contains(a.tablet)) {
 +        // do not ask a tserver to load a tablet where the future location 
could not be set
 +        continue;
 +      }
        try {
          TServerConnection client = manager.tserverSet.getConnection(a.server);
          if (client != null) {
@@@ -1042,45 -1152,10 +1052,48 @@@
              tException);
        }
      }
- 
+     if (!tLists.assignments.isEmpty()) {
+       Manager.log.debug("[{}] sent {} assignment messages in {} ms", 
store.name(),
+           tLists.assignments.size(), timer.elapsed(TimeUnit.MILLISECONDS));
+     }
 +    replaceVolumes(tLists.volumeReplacements);
 +  }
 +
 +  private void replaceVolumes(List<VolumeUtil.VolumeReplacements> 
volumeReplacementsList) {
 +    try (var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
 +      for (VolumeUtil.VolumeReplacements vr : volumeReplacementsList) {
 +        var tabletMutator =
 +            
tabletsMutator.mutateTablet(vr.tabletMeta.getExtent()).requireAbsentOperation()
 +                .requireAbsentLocation().requireSame(vr.tabletMeta, FILES, 
LOGS);
 +        vr.logsToRemove.forEach(tabletMutator::deleteWal);
 +        vr.logsToAdd.forEach(tabletMutator::putWal);
 +
 +        vr.filesToRemove.forEach(tabletMutator::deleteFile);
 +        vr.filesToAdd.forEach(tabletMutator::putFile);
 +
 +        tabletMutator.submit(tm -> {
 +          // Check to see if the logs and files are removed. Checking if the 
new files or logs were
 +          // added has a race condition, those could have been successfully 
added and then removed
 +          // before this check runs, like if a compaction runs. Once the old 
volumes are removed
 +          // nothing should ever add them again.
 +          var logsRemoved =
 +              Collections.disjoint(Set.copyOf(tm.getLogs()), 
Set.copyOf(vr.logsToRemove));
 +          var filesRemoved = Collections.disjoint(tm.getFiles(), 
Set.copyOf(vr.filesToRemove));
 +          LOG.debug(
 +              "replaceVolume conditional mutation rejection check {} 
logsRemoved:{} filesRemoved:{}",
 +              tm.getExtent(), logsRemoved, filesRemoved);
 +          return logsRemoved && filesRemoved;
 +        }, () -> "replace volume");
 +      }
 +
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() == Ample.ConditionalResult.Status.REJECTED) {
 +          // log that failure happened, should try again later
 +          LOG.debug("Failed to update volumes for tablet {}", extent);
 +        }
 +      });
 +    }
 +
    }
  
    private static void markDeadServerLogsAsClosed(WalStateManager mgr,

Reply via email to