This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit b533f44c979cc8e40e980ec7d1d363b8df79956f Merge: e0b53a47bc 5bba2f3cf3 Author: Keith Turner <[email protected]> AuthorDate: Wed Jul 2 20:52:09 2025 +0000 Merge commit '5bba2f3cf36717d0e2fabbf4abb93c3455c0619e' .../org/apache/accumulo/tserver/TabletServer.java | 2 +- .../accumulo/tserver/tablet/ScanfileManager.java | 68 ++++++++++------------ .../org/apache/accumulo/tserver/tablet/Tablet.java | 18 +++++- 3 files changed, 49 insertions(+), 39 deletions(-) diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java index 68ced7e711,0000000000..78019e59ca mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java @@@ -1,182 -1,0 +1,178 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +class ScanfileManager { + private final Logger log = LoggerFactory.getLogger(ScanfileManager.class); + private final Tablet tablet; + + ScanfileManager(Tablet tablet) { + this.tablet = tablet; + } + + private final Set<StoredTabletFile> filesToDeleteAfterScan = new HashSet<>(); + private final Map<Long,Set<StoredTabletFile>> scanFileReservations = new HashMap<>(); + private final MapCounter<StoredTabletFile> fileScanReferenceCounts = new MapCounter<>(); + private long nextScanReservationId = 0; + + static void rename(VolumeManager fs, Path src, Path dst) throws IOException { + if (!fs.rename(src, dst)) { + throw new IOException("Rename " + src + " to " + dst + " returned false "); + } + } + + /** + * Removes any scan-in-use metadata entries that were left behind when a scan cleanup was + * interrupted. Intended to be called periodically to clear these orphaned scan refs once their + * in-memory reference count reaches zero. + */ - public void removeOrphanedScanRefs() { ++ public void removeBatchedScanRefs() { + Set<StoredTabletFile> snapshot; + Location currLoc; + synchronized (tablet) { + snapshot = new HashSet<>(filesToDeleteAfterScan); + filesToDeleteAfterScan.clear(); + currLoc = Location.current(tablet.getTabletServer().getTabletSession()); + } + removeFilesAfterScan(snapshot, currLoc); + } + + static void removeScanFiles(KeyExtent extent, Set<StoredTabletFile> scanFiles, + ServerContext context, Location currLocation) { + try (var mutator = context.getAmple().conditionallyMutateTablets()) { + var tabletMutator = mutator.mutateTablet(extent).requireLocation(currLocation); + + scanFiles.forEach(tabletMutator::deleteScan); + + tabletMutator + .submit(tabletMetadata -> Collections.disjoint(scanFiles, tabletMetadata.getScans())); + + var result = mutator.process().get(extent); + Preconditions.checkState(result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED, + "Failed to remove scan file entries for %s", extent); + } + } + + Pair<Long,Map<StoredTabletFile,DataFileValue>> reserveFilesForScan() { + synchronized (tablet) { + + var tabletsFiles = tablet.getDatafiles(); + Set<StoredTabletFile> absFilePaths = new HashSet<>(tabletsFiles.keySet()); + + long rid = nextScanReservationId++; + + scanFileReservations.put(rid, absFilePaths); + + Map<StoredTabletFile,DataFileValue> ret = new HashMap<>(); + + for (StoredTabletFile path : absFilePaths) { + fileScanReferenceCounts.increment(path, 1); + ret.put(path, tabletsFiles.get(path)); + } + + return new Pair<>(rid, ret); + } + } + + void returnFilesForScan(Long reservationId) { ++ synchronized (tablet) { ++ Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); + - final Set<StoredTabletFile> filesToDelete = new HashSet<>(); - - try { - synchronized (tablet) { - Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); - - if (absFilePaths == null) { - throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); - } ++ if (absFilePaths == null) { ++ throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); ++ } + - boolean notify = false; - try { - for (StoredTabletFile path : absFilePaths) { - long refCount = fileScanReferenceCounts.decrement(path, 1); - if (refCount == 0) { - if (filesToDeleteAfterScan.remove(path)) { - filesToDelete.add(path); - } - notify = true; - } else if (refCount < 0) { - throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); - } - } - } finally { - if (notify) { - tablet.notifyAll(); ++ boolean notify = false; ++ try { ++ for (StoredTabletFile path : absFilePaths) { ++ long refCount = fileScanReferenceCounts.decrement(path, 1); ++ if (refCount == 0) { ++ notify = true; ++ } else if (refCount < 0) { ++ throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + } - } - } finally { - if (!filesToDelete.isEmpty()) { - // Remove scan files even if the loop above did not fully complete because once a - // file is in the set filesToDelete that means it was removed from filesToDeleteAfterScan - // and would never be added back. - log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); - - var currLoc = Location.current(tablet.getTabletServer().getTabletSession()); - removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), currLoc); ++ } finally { ++ if (notify) { ++ tablet.notifyAll(); ++ } + } + } + } + + void removeFilesAfterScan(Collection<StoredTabletFile> scanFiles, Location location) { + if (scanFiles.isEmpty()) { + return; + } + + Set<StoredTabletFile> filesToDelete = new HashSet<>(); + + synchronized (tablet) { + for (StoredTabletFile path : scanFiles) { + if (fileScanReferenceCounts.get(path) == 0) { + filesToDelete.add(path); + } else { + filesToDeleteAfterScan.add(path); + } + } + } + + if (!filesToDelete.isEmpty()) { + log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); + removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), location); + } + } ++ ++ /** ++ * @return true if any file is no longer in use by a scan and can be removed, false otherwise. ++ */ ++ boolean canScanRefsBeRemoved() { ++ synchronized (tablet) { ++ for (var path : filesToDeleteAfterScan) { ++ if (fileScanReferenceCounts.get(path) == 0) { ++ return true; ++ } ++ } ++ return false; ++ } ++ } +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index bb590f96c9,8e296d400d..4325be7d3a --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -1493,11 -2213,29 +1493,25 @@@ public class Tablet extends TabletBase @Override public void returnFilesForScan(long scanId) { - getDatafileManager().returnFilesForScan(scanId); - } - - public MetadataUpdateCount getUpdateCount() { - return getDatafileManager().getUpdateCount(); + getScanfileManager().returnFilesForScan(scanId); } - public void removeOrphanedScanRefs() { - getScanfileManager().removeOrphanedScanRefs(); + public void removeBatchedScanRefs() { + synchronized (this) { + if (isClosed() || isClosing()) { + return; + } + // return early if there are no scan files to remove - if (!getDatafileManager().canScanRefsBeRemoved()) { ++ if (!getScanfileManager().canScanRefsBeRemoved()) { + return; + } + incrementWritesInProgress(); + } + try { - getDatafileManager().removeBatchedScanRefs(); ++ getScanfileManager().removeBatchedScanRefs(); + } finally { + decrementWritesInProgress(); + } } TabletMemory getTabletMemory() {
