This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit c22cd0ce243f678674e3d8b4c73b2634e467f3ef Merge: fa09c1a7f5 a89aadcaf8 Author: Daniel Roberts ddanielr <[email protected]> AuthorDate: Mon Jun 23 18:43:14 2025 +0000 Merge branch '2.1' .../apache/accumulo/manager/TabletGroupWatcher.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 4481ec28fd,5604e69c70..c9ff9ba426 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -56,34 -53,43 +56,35 @@@ import org.apache.accumulo.core.data.Ra import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger; import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerState; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; -import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; + import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; -import org.apache.accumulo.manager.Manager.TabletGoalState; -import org.apache.accumulo.manager.state.MergeStats; +import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; -import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.conf.CheckCompactionConfig; import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.gc.AllVolumesDirectory; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; @@@ -997,37 -1115,29 +998,46 @@@ abstract class TabletGroupWatcher exten } } - private void flushChanges(TabletLists tLists, WalStateManager wals) + private final Lock flushLock = new ReentrantLock(); + + private void flushChanges(TabletLists tLists) throws DistributedStoreException, TException, WalMarkerException { var unassigned = Collections.unmodifiableMap(tLists.unassigned); - - handleDeadTablets(tLists, wals); - - int beforeSize = tLists.assignments.size(); - Timer timer = Timer.startNew(); -- - getAssignmentsFromBalancer(tLists, unassigned); - if (!unassigned.isEmpty()) { - Manager.log.debug("[{}] requested assignments for {} tablets and got {} in {} ms", - store.name(), unassigned.size(), tLists.assignments.size() - beforeSize, - timer.elapsed(TimeUnit.MILLISECONDS)); ++ Timer timer; + flushLock.lock(); + try { + // This method was originally only ever called by one thread. The code was modified so that + // two threads could possibly call this flush method concurrently. It is not clear the + // following methods are thread safe so a lock is acquired out of caution. Balancer plugins + // may not expect multiple threads to call them concurrently, Accumulo has not done this in + // the past. The log recovery code needs to be evaluated for thread safety. + handleDeadTablets(tLists); + ++ int beforeSize = tLists.assignments.size(); ++ timer = Timer.startNew(); ++ + getAssignmentsFromBalancer(tLists, unassigned); ++ if (!unassigned.isEmpty()) { ++ Manager.log.debug("[{}] requested assignments for {} tablets and got {} in {} ms", ++ store.name(), unassigned.size(), tLists.assignments.size() - beforeSize, ++ timer.elapsed(TimeUnit.MILLISECONDS)); ++ } + } finally { + flushLock.unlock(); } + Set<KeyExtent> failedFuture = Set.of(); if (!tLists.assignments.isEmpty()) { - Manager.log.info(String.format("Assigning %d tablets", tLists.assignments.size())); + Manager.log.info("Assigning {} tablets", tLists.assignments.size()); - store.setFutureLocations(tLists.assignments); + failedFuture = store.setFutureLocations(tLists.assignments); } tLists.assignments.addAll(tLists.assigned); + timer.restart(); for (Assignment a : tLists.assignments) { + if (failedFuture.contains(a.tablet)) { + // do not ask a tserver to load a tablet where the future location could not be set + continue; + } try { TServerConnection client = manager.tserverSet.getConnection(a.server); if (client != null) { @@@ -1042,45 -1152,10 +1052,48 @@@ tException); } } - + if (!tLists.assignments.isEmpty()) { + Manager.log.debug("[{}] sent {} assignment messages in {} ms", store.name(), + tLists.assignments.size(), timer.elapsed(TimeUnit.MILLISECONDS)); + } + replaceVolumes(tLists.volumeReplacements); + } + + private void replaceVolumes(List<VolumeUtil.VolumeReplacements> volumeReplacementsList) { + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + for (VolumeUtil.VolumeReplacements vr : volumeReplacementsList) { + var tabletMutator = + tabletsMutator.mutateTablet(vr.tabletMeta.getExtent()).requireAbsentOperation() + .requireAbsentLocation().requireSame(vr.tabletMeta, FILES, LOGS); + vr.logsToRemove.forEach(tabletMutator::deleteWal); + vr.logsToAdd.forEach(tabletMutator::putWal); + + vr.filesToRemove.forEach(tabletMutator::deleteFile); + vr.filesToAdd.forEach(tabletMutator::putFile); + + tabletMutator.submit(tm -> { + // Check to see if the logs and files are removed. Checking if the new files or logs were + // added has a race condition, those could have been successfully added and then removed + // before this check runs, like if a compaction runs. Once the old volumes are removed + // nothing should ever add them again. + var logsRemoved = + Collections.disjoint(Set.copyOf(tm.getLogs()), Set.copyOf(vr.logsToRemove)); + var filesRemoved = Collections.disjoint(tm.getFiles(), Set.copyOf(vr.filesToRemove)); + LOG.debug( + "replaceVolume conditional mutation rejection check {} logsRemoved:{} filesRemoved:{}", + tm.getExtent(), logsRemoved, filesRemoved); + return logsRemoved && filesRemoved; + }, () -> "replace volume"); + } + + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.REJECTED) { + // log that failure happened, should try again later + LOG.debug("Failed to update volumes for tablet {}", extent); + } + }); + } + } private static void markDeadServerLogsAsClosed(WalStateManager mgr,
