This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit b533f44c979cc8e40e980ec7d1d363b8df79956f
Merge: e0b53a47bc 5bba2f3cf3
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jul 2 20:52:09 2025 +0000

    Merge commit '5bba2f3cf36717d0e2fabbf4abb93c3455c0619e'

 .../org/apache/accumulo/tserver/TabletServer.java  |  2 +-
 .../accumulo/tserver/tablet/ScanfileManager.java   | 68 ++++++++++------------
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 18 +++++-
 3 files changed, 49 insertions(+), 39 deletions(-)

diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
index 68ced7e711,0000000000..78019e59ca
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
@@@ -1,182 -1,0 +1,178 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +class ScanfileManager {
 +  private final Logger log = LoggerFactory.getLogger(ScanfileManager.class);
 +  private final Tablet tablet;
 +
 +  ScanfileManager(Tablet tablet) {
 +    this.tablet = tablet;
 +  }
 +
 +  private final Set<StoredTabletFile> filesToDeleteAfterScan = new 
HashSet<>();
 +  private final Map<Long,Set<StoredTabletFile>> scanFileReservations = new 
HashMap<>();
 +  private final MapCounter<StoredTabletFile> fileScanReferenceCounts = new 
MapCounter<>();
 +  private long nextScanReservationId = 0;
 +
 +  static void rename(VolumeManager fs, Path src, Path dst) throws IOException 
{
 +    if (!fs.rename(src, dst)) {
 +      throw new IOException("Rename " + src + " to " + dst + " returned false 
");
 +    }
 +  }
 +
 +  /**
 +   * Removes any scan-in-use metadata entries that were left behind when a 
scan cleanup was
 +   * interrupted. Intended to be called periodically to clear these orphaned 
scan refs once their
 +   * in-memory reference count reaches zero.
 +   */
-   public void removeOrphanedScanRefs() {
++  public void removeBatchedScanRefs() {
 +    Set<StoredTabletFile> snapshot;
 +    Location currLoc;
 +    synchronized (tablet) {
 +      snapshot = new HashSet<>(filesToDeleteAfterScan);
 +      filesToDeleteAfterScan.clear();
 +      currLoc = Location.current(tablet.getTabletServer().getTabletSession());
 +    }
 +    removeFilesAfterScan(snapshot, currLoc);
 +  }
 +
 +  static void removeScanFiles(KeyExtent extent, Set<StoredTabletFile> 
scanFiles,
 +      ServerContext context, Location currLocation) {
 +    try (var mutator = context.getAmple().conditionallyMutateTablets()) {
 +      var tabletMutator = 
mutator.mutateTablet(extent).requireLocation(currLocation);
 +
 +      scanFiles.forEach(tabletMutator::deleteScan);
 +
 +      tabletMutator
 +          .submit(tabletMetadata -> Collections.disjoint(scanFiles, 
tabletMetadata.getScans()));
 +
 +      var result = mutator.process().get(extent);
 +      Preconditions.checkState(result.getStatus() == 
Ample.ConditionalResult.Status.ACCEPTED,
 +          "Failed to remove scan file entries for %s", extent);
 +    }
 +  }
 +
 +  Pair<Long,Map<StoredTabletFile,DataFileValue>> reserveFilesForScan() {
 +    synchronized (tablet) {
 +
 +      var tabletsFiles = tablet.getDatafiles();
 +      Set<StoredTabletFile> absFilePaths = new 
HashSet<>(tabletsFiles.keySet());
 +
 +      long rid = nextScanReservationId++;
 +
 +      scanFileReservations.put(rid, absFilePaths);
 +
 +      Map<StoredTabletFile,DataFileValue> ret = new HashMap<>();
 +
 +      for (StoredTabletFile path : absFilePaths) {
 +        fileScanReferenceCounts.increment(path, 1);
 +        ret.put(path, tabletsFiles.get(path));
 +      }
 +
 +      return new Pair<>(rid, ret);
 +    }
 +  }
 +
 +  void returnFilesForScan(Long reservationId) {
++    synchronized (tablet) {
++      Set<StoredTabletFile> absFilePaths = 
scanFileReservations.remove(reservationId);
 +
-     final Set<StoredTabletFile> filesToDelete = new HashSet<>();
- 
-     try {
-       synchronized (tablet) {
-         Set<StoredTabletFile> absFilePaths = 
scanFileReservations.remove(reservationId);
- 
-         if (absFilePaths == null) {
-           throw new IllegalArgumentException("Unknown scan reservation id " + 
reservationId);
-         }
++      if (absFilePaths == null) {
++        throw new IllegalArgumentException("Unknown scan reservation id " + 
reservationId);
++      }
 +
-         boolean notify = false;
-         try {
-           for (StoredTabletFile path : absFilePaths) {
-             long refCount = fileScanReferenceCounts.decrement(path, 1);
-             if (refCount == 0) {
-               if (filesToDeleteAfterScan.remove(path)) {
-                 filesToDelete.add(path);
-               }
-               notify = true;
-             } else if (refCount < 0) {
-               throw new IllegalStateException("Scan ref count for " + path + 
" is " + refCount);
-             }
-           }
-         } finally {
-           if (notify) {
-             tablet.notifyAll();
++      boolean notify = false;
++      try {
++        for (StoredTabletFile path : absFilePaths) {
++          long refCount = fileScanReferenceCounts.decrement(path, 1);
++          if (refCount == 0) {
++            notify = true;
++          } else if (refCount < 0) {
++            throw new IllegalStateException("Scan ref count for " + path + " 
is " + refCount);
 +          }
 +        }
-       }
-     } finally {
-       if (!filesToDelete.isEmpty()) {
-         // Remove scan files even if the loop above did not fully complete 
because once a
-         // file is in the set filesToDelete that means it was removed from 
filesToDeleteAfterScan
-         // and would never be added back.
-         log.debug("Removing scan refs from metadata {} {}", 
tablet.getExtent(), filesToDelete);
- 
-         var currLoc = 
Location.current(tablet.getTabletServer().getTabletSession());
-         removeScanFiles(tablet.getExtent(), filesToDelete, 
tablet.getContext(), currLoc);
++      } finally {
++        if (notify) {
++          tablet.notifyAll();
++        }
 +      }
 +    }
 +  }
 +
 +  void removeFilesAfterScan(Collection<StoredTabletFile> scanFiles, Location 
location) {
 +    if (scanFiles.isEmpty()) {
 +      return;
 +    }
 +
 +    Set<StoredTabletFile> filesToDelete = new HashSet<>();
 +
 +    synchronized (tablet) {
 +      for (StoredTabletFile path : scanFiles) {
 +        if (fileScanReferenceCounts.get(path) == 0) {
 +          filesToDelete.add(path);
 +        } else {
 +          filesToDeleteAfterScan.add(path);
 +        }
 +      }
 +    }
 +
 +    if (!filesToDelete.isEmpty()) {
 +      log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), 
filesToDelete);
 +      removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), 
location);
 +    }
 +  }
++
++  /**
++   * @return true if any file is no longer in use by a scan and can be 
removed, false otherwise.
++   */
++  boolean canScanRefsBeRemoved() {
++    synchronized (tablet) {
++      for (var path : filesToDeleteAfterScan) {
++        if (fileScanReferenceCounts.get(path) == 0) {
++          return true;
++        }
++      }
++      return false;
++    }
++  }
 +}
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index bb590f96c9,8e296d400d..4325be7d3a
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -1493,11 -2213,29 +1493,25 @@@ public class Tablet extends TabletBase 
  
    @Override
    public void returnFilesForScan(long scanId) {
 -    getDatafileManager().returnFilesForScan(scanId);
 -  }
 -
 -  public MetadataUpdateCount getUpdateCount() {
 -    return getDatafileManager().getUpdateCount();
 +    getScanfileManager().returnFilesForScan(scanId);
    }
  
-   public void removeOrphanedScanRefs() {
-     getScanfileManager().removeOrphanedScanRefs();
+   public void removeBatchedScanRefs() {
+     synchronized (this) {
+       if (isClosed() || isClosing()) {
+         return;
+       }
+       // return early if there are no scan files to remove
 -      if (!getDatafileManager().canScanRefsBeRemoved()) {
++      if (!getScanfileManager().canScanRefsBeRemoved()) {
+         return;
+       }
+       incrementWritesInProgress();
+     }
+     try {
 -      getDatafileManager().removeBatchedScanRefs();
++      getScanfileManager().removeBatchedScanRefs();
+     } finally {
+       decrementWritesInProgress();
+     }
    }
  
    TabletMemory getTabletMemory() {

Reply via email to