This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 52de3d5637411d64b2d7dcd3040d5c798a16f073
Merge: 7e843611d2 b3aed75b87
Author: Dave Marion <[email protected]>
AuthorDate: Thu Jun 5 13:24:59 2025 +0000

    Merge branch '2.1'

 .../iteratorsImpl/ClientIteratorEnvironment.java   |  23 +-
 .../iterators/SystemIteratorEnvironmentImpl.java   | 131 +++++++++++
 .../iterators/TabletIteratorEnvironment.java       | 241 ---------------------
 .../manager/state/TabletManagementIterator.java    |   4 +-
 .../server/manager/state/ZooTabletStateStore.java  |   9 +-
 .../server/tablets/ConditionCheckerContext.java    |  13 +-
 .../org/apache/accumulo/compactor/ExtCEnv.java     |  44 +++-
 .../apache/accumulo/tserver/tablet/MinCEnv.java    |   8 +-
 .../accumulo/tserver/tablet/ScanDataSource.java    |  15 +-
 .../apache/accumulo/tserver/InMemoryMapTest.java   |  99 ++++-----
 .../test/functional/MemoryConsumingCompactor.java  |  12 +-
 .../test/performance/scan/CollectTabletStats.java  |   7 +-
 12 files changed, 273 insertions(+), 333 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/iteratorsImpl/ClientIteratorEnvironment.java
index 417a38ffcd,a4778441ea..e593ea8f41
--- 
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/ClientIteratorEnvironment.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/ClientIteratorEnvironment.java
@@@ -28,10 -29,14 +28,11 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.sample.SamplerConfiguration;
  import org.apache.accumulo.core.clientImpl.ClientContext;
  import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
 -import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.TableId;
 -import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.iterators.IteratorEnvironment;
  import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 -import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
  import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.spi.common.ServiceEnvironment;
  
  public class ClientIteratorEnvironment implements IteratorEnvironment {
  
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index e542a3a2ea,0000000000..ca5b0bafbc
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@@ -1,345 -1,0 +1,345 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import java.io.IOException;
 +import java.util.AbstractMap;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.PluginEnvironment.Configuration;
 +import org.apache.accumulo.core.client.ScannerBase;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 +import org.apache.accumulo.core.metadata.TabletState;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
 +import org.apache.accumulo.core.spi.balancer.DoNothingBalancer;
 +import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 +import org.apache.accumulo.server.fs.VolumeUtil;
- import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
++import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
 +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 +import org.apache.accumulo.server.split.SplitUtils;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Iterator used by the TabletGroupWatcher threads in the Manager. This 
iterator returns
 + * TabletManagement objects for each Tablet that needs some type of action 
performed on it by the
 + * Manager.
 + */
 +public class TabletManagementIterator extends WholeRowIterator {
 +  private static final Logger LOG = 
LoggerFactory.getLogger(TabletManagementIterator.class);
 +  public static final String TABLET_GOAL_STATE_PARAMS_OPTION = "tgsParams";
 +  private CompactionJobGenerator compactionGenerator;
 +  private TabletBalancer balancer;
 +  private final SplitConfig splitConfig = new SplitConfig();
 +
 +  private static class SplitConfig {
 +    TableId tableId;
 +    long splitThreshold;
 +    long maxEndRowSize;
 +    int maxFilesToOpen;
 +
 +    void update(TableId tableId, Configuration tableConfig) {
 +      if (!tableId.equals(this.tableId)) {
 +        this.tableId = tableId;
 +        splitThreshold = ConfigurationTypeHelper
 +            
.getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
 +        maxEndRowSize = ConfigurationTypeHelper
 +            
.getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey()));
 +        maxFilesToOpen = (int) ConfigurationTypeHelper
 +            
.getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey()));
 +      }
 +    }
 +  }
 +
 +  private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
 +      final Configuration tableConfig, SplitConfig splitConfig) {
 +
 +    // Should see the same table many times in a row, so this should only 
read config the first time
 +    // seen. Reading the config for each tablet was showing up as performance 
problem when profiling
 +    // SplitMillionIT that reads one million tablets. It is also nice to have 
snapshot of config
 +    // that is used for all tablet in a table.
 +    splitConfig.update(tm.getTableId(), tableConfig);
 +
 +    // If the current computed metadata matches the current marker then we 
can't split,
 +    // so we return false. If the marker is set but doesn't match then return 
true
 +    // which gives a chance to clean up the marker and recheck.
 +    var unsplittable = tm.getUnSplittable();
 +    if (unsplittable != null) {
 +      return !unsplittable
 +          .equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), 
splitConfig.splitThreshold,
 +              splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen, 
tm.getFiles()));
 +    }
 +
 +    // If unsplittable is not set at all then check if over split threshold
 +    final boolean shouldSplit = 
SplitUtils.needsSplit(splitConfig.splitThreshold, tm);
 +    LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", 
tm.getExtent(),
 +        tm.getFileSize(), splitConfig.splitThreshold, shouldSplit);
 +    return shouldSplit;
 +  }
 +
 +  private boolean shouldReturnDueToLocation(final TabletMetadata tm) {
 +
 +    if (tm.getExtent().isRootTablet()) {
 +      return true;
 +    }
 +
 +    if (tm.getMigration() != null) {
 +      // Ideally only the state and goalState would need to be used to 
determine if a tablet should
 +      // be returned. However, the Manager/TGW currently needs everything 
currently migrating
 +      // returned so it can update the migrations. If this were improved then 
this case would not
 +      // be needed.
 +      return true;
 +    }
 +
 +    TabletState state = TabletState.compute(tm, 
tabletMgmtParams.getOnlineTsevers());
 +    TabletGoalState goalState = TabletGoalState.compute(tm, state, balancer, 
tabletMgmtParams);
 +    if (LOG.isTraceEnabled()) {
 +      LOG.trace(
 +          "extent:{} state:{} goalState:{} tabletAvailability:{}, 
hostingRequested: {}, opId: {}",
 +          tm.getExtent(), state, goalState, tm.getTabletAvailability(), 
tm.getHostingRequested(),
 +          tm.getOperationId());
 +    }
 +
 +    switch (goalState) {
 +      case HOSTED:
 +        return state != TabletState.HOSTED;
 +      case SUSPENDED:
 +        return state != TabletState.SUSPENDED;
 +      case UNASSIGNED:
 +        return state != TabletState.UNASSIGNED;
 +      default:
 +        throw new IllegalStateException("unknown goal state " + goalState);
 +    }
 +  }
 +
 +  public static void configureScanner(final ScannerBase scanner,
 +      final TabletManagementParameters tabletMgmtParams) {
 +    // Note : if the scanner is ever made to fetch columns, then 
TabletManagement.CONFIGURED_COLUMNS
 +    // must be updated
 +    IteratorSetting tabletChange =
 +        new IteratorSetting(1001, "ManagerTabletInfoIterator", 
TabletManagementIterator.class);
 +    tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, 
tabletMgmtParams.serialize());
 +    scanner.addScanIterator(tabletChange);
 +  }
 +
 +  public static TabletManagement decode(Entry<Key,Value> e) throws 
IOException {
 +    return new TabletManagement(e.getKey(), e.getValue());
 +  }
 +
 +  private IteratorEnvironment env;
 +
 +  private TabletManagementParameters tabletMgmtParams = null;
 +
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
 +      IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    this.env = env;
 +    tabletMgmtParams =
 +        
TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION));
 +    compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(),
 +        tabletMgmtParams.getCompactionHints(), 
tabletMgmtParams.getSteadyTime());
 +    final AccumuloConfiguration conf = new 
ConfigurationCopy(env.getPluginEnv().getConfiguration());
 +    BalancerEnvironmentImpl benv =
-         new BalancerEnvironmentImpl(((TabletIteratorEnvironment) 
env).getServerContext());
++        new BalancerEnvironmentImpl(((SystemIteratorEnvironment) 
env).getServerContext());
 +    try {
 +      balancer = Property.createInstanceFromPropertyName(conf, 
Property.MANAGER_TABLET_BALANCER,
 +          TabletBalancer.class, new DoNothingBalancer());
 +      balancer.init(benv);
 +    } catch (Exception e) {
 +      LOG.warn("Failed to create balancer falling back to {}", 
DoNothingBalancer.class, e);
 +      balancer = new DoNothingBalancer();
 +      balancer.init(benv);
 +    }
 +  }
 +
 +  @Override
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
 +      throws IOException {
 +    if (range != null) {
 +      // This iterator sits on top of the WholeRowIterator (see 
configureScanner), so enforce
 +      // that the start and end keys in the Range only have a row component 
to the key.
 +      for (Key k : new Key[] {range.getStartKey(), range.getEndKey()}) {
 +        if (k != null && k.compareTo(new Key(k.getRow())) != 0) {
 +          throw new IllegalArgumentException(
 +              "TabletManagementIterator must be seeked with keys that only 
contain a row, supplied range: "
 +                  + range);
 +        }
 +      }
 +    }
 +    super.seek(range, columnFamilies, inclusive);
 +  }
 +
 +  @Override
 +  protected boolean filter(Text currentRow, List<Key> keys, List<Value> 
values) {
 +
 +    var keyIter = keys.listIterator();
 +    var kvIter = new Iterator<Map.Entry<Key,Value>>() {
 +      @Override
 +      public boolean hasNext() {
 +        return keyIter.hasNext();
 +      }
 +
 +      @Override
 +      public Entry<Key,Value> next() {
 +        var valueIdx = keyIter.nextIndex();
 +        var key = keyIter.next();
 +        return new AbstractMap.SimpleImmutableEntry<>(key, 
values.get(valueIdx));
 +      }
 +    };
 +
 +    final Set<ManagementAction> actions = new HashSet<>();
 +    final TabletMetadata tm =
 +        TabletMetadata.convertRow(kvIter, 
TabletManagement.CONFIGURED_COLUMNS, false, true);
 +
 +    Exception error = null;
 +    try {
 +      LOG.trace("Evaluating extent: {}", tm);
 +      computeTabletManagementActions(tm, actions);
 +    } catch (Exception e) {
 +      LOG.error("Error computing tablet management actions for extent: {}", 
tm.getExtent(), e);
 +      error = e;
 +    }
 +
 +    if (!actions.isEmpty() || error != null) {
 +      if (error != null) {
 +        // Insert the error into K,V pair representing
 +        // the tablet metadata.
 +        TabletManagement.addError((k, v) -> {
 +          keys.add(k);
 +          values.add(v);
 +        }, currentRow, error);
 +      } else if (!actions.isEmpty()) {
 +        // If we simply returned here, then the client would get the encoded 
K,V
 +        // from the WholeRowIterator. However, it would not know the 
reason(s) why
 +        // it was returned. Insert a K,V pair to represent the reasons. The 
client
 +        // can pull this K,V pair from the results by looking at the colf.
 +        TabletManagement.addActions((k, v) -> {
 +          keys.add(k);
 +          values.add(v);
 +        }, currentRow, actions);
 +      }
 +
 +      // This key is being created exactly the same way as the whole row 
iterator creates keys.
 +      // This is important for ensuring that seek works as expected in the 
continue case. See
 +      // WholeRowIterator seek function for details, it looks for keys w/o 
columns.
 +      LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(), 
actions);
 +      return true;
 +    }
 +
 +    LOG.trace("No reason to return extent {}, continuing", tm.getExtent());
 +    return false;
 +  }
 +
 +  private static final Set<ManagementAction> REASONS_NOT_TO_SPLIT_OR_COMPACT =
 +      Collections.unmodifiableSet(EnumSet.of(ManagementAction.BAD_STATE,
 +          ManagementAction.NEEDS_VOLUME_REPLACEMENT, 
ManagementAction.NEEDS_RECOVERY));
 +
 +  /**
 +   * Evaluates whether or not this Tablet should be returned so that it can 
be acted upon by the
 +   * Manager
 +   */
 +  private void computeTabletManagementActions(final TabletMetadata tm,
 +      final Set<ManagementAction> reasonsToReturnThisTablet) {
 +
 +    if (tm.isFutureAndCurrentLocationSet()) {
 +      // no need to check everything, we are in a known state where we want 
to return everything.
 +      reasonsToReturnThisTablet.add(ManagementAction.BAD_STATE);
 +    }
 +
 +    if (!tm.getLogs().isEmpty() && (tm.getOperationId() == null
 +        || tm.getOperationId().getType() != TabletOperationType.DELETING)) {
 +      reasonsToReturnThisTablet.add(ManagementAction.NEEDS_RECOVERY);
 +    }
 +
 +    if 
(VolumeUtil.needsVolumeReplacement(tabletMgmtParams.getVolumeReplacements(), 
tm)) {
 +      
reasonsToReturnThisTablet.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT);
 +    }
 +
 +    if (shouldReturnDueToLocation(tm)) {
 +      reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE);
 +    }
 +
 +    if (tm.getOperationId() == null && 
tabletMgmtParams.isTableOnline(tm.getTableId())
 +        && Collections.disjoint(REASONS_NOT_TO_SPLIT_OR_COMPACT, 
reasonsToReturnThisTablet)) {
 +      try {
 +        if (shouldReturnDueToSplit(tm, 
this.env.getPluginEnv().getConfiguration(tm.getTableId()),
 +            splitConfig)) {
 +          reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING);
 +        }
 +        // important to call this since reasonsToReturnThisTablet is passed 
to it
 +        if (!compactionGenerator
 +            .generateJobs(tm, 
determineCompactionKinds(reasonsToReturnThisTablet)).isEmpty()) {
 +          reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING);
 +        }
 +      } catch (NullPointerException e) {
 +        LOG.info(
 +            "Unable to determine if tablet {} should split or compact, maybe 
table was deleted?",
 +            tm.getExtent());
 +      }
 +    }
 +  }
 +
 +  private static final Set<CompactionKind> ALL_COMPACTION_KINDS =
 +      Collections.unmodifiableSet(EnumSet.allOf(CompactionKind.class));
 +  private static final Set<CompactionKind> SPLIT_COMPACTION_KINDS;
 +
 +  static {
 +    var tmp = EnumSet.allOf(CompactionKind.class);
 +    tmp.remove(CompactionKind.SYSTEM);
 +    SPLIT_COMPACTION_KINDS = Collections.unmodifiableSet(tmp);
 +  }
 +
 +  public static Set<CompactionKind>
 +      determineCompactionKinds(Set<ManagementAction> 
reasonsToReturnThisTablet) {
 +    if (reasonsToReturnThisTablet.contains(ManagementAction.NEEDS_SPLITTING)) 
{
 +      return SPLIT_COMPACTION_KINDS;
 +    } else {
 +      return ALL_COMPACTION_KINDS;
 +    }
 +  }
 +
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
index 671f16dc04,398e67c112..600b96a8fe
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
@@@ -18,34 -18,25 +18,35 @@@
   */
  package org.apache.accumulo.server.manager.state;
  
 -import java.util.ArrayList;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.AbstractMap;
  import java.util.Collection;
 -import java.util.Collections;
  import java.util.List;
  import java.util.Map;
 -
 -import org.apache.accumulo.core.clientImpl.ClientContext;
 +import java.util.Map.Entry;
 +import java.util.NoSuchElementException;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
  import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.SystemTables;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
  import org.apache.accumulo.core.metadata.schema.Ample;
  import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 -import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency;
 -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
 +import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 -import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 -import org.apache.accumulo.core.tabletserver.log.LogEntry;
 -import org.apache.accumulo.server.util.ManagerMetadataUtil;
 +import org.apache.accumulo.core.util.time.SteadyTime;
 +import org.apache.accumulo.server.ServerContext;
- import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
++import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
++import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl;
  import org.apache.hadoop.fs.Path;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -70,32 -60,14 +71,34 @@@ class ZooTabletStateStore extends Abstr
    }
  
    @Override
 -  public ClosableIterator<TabletLocationState> iterator() {
 +  public ClosableIterator<TabletManagement> iterator(List<Range> ranges,
 +      TabletManagementParameters parameters) {
 +    Preconditions.checkArgument(parameters.getLevel() == getLevel());
 +
-     final TabletIteratorEnvironment env = new TabletIteratorEnvironment(ctx, 
IteratorScope.scan,
-         ctx.getTableConfiguration(SystemTables.ROOT.tableId()), 
SystemTables.ROOT.tableId());
++    final SystemIteratorEnvironment env =
++        (SystemIteratorEnvironment) new 
SystemIteratorEnvironmentImpl.Builder(ctx)
++            
.withScope(IteratorScope.scan).withTableId(SystemTables.ROOT.tableId()).build();
++
 +    final TabletManagementIterator tmi = new TabletManagementIterator();
 +    final AtomicBoolean closed = new AtomicBoolean(false);
 +
 +    try {
 +      final RootTabletMetadata rtm = RootTabletMetadata.read(ctx);
 +      final SortedMapIterator iter = new SortedMapIterator(rtm.toKeyValues());
 +      tmi.init(iter,
 +          Map.of(TabletManagementIterator.TABLET_GOAL_STATE_PARAMS_OPTION, 
parameters.serialize()),
 +          env);
 +      tmi.seek(new Range(), null, true);
 +    } catch (IOException e2) {
 +      throw new IllegalStateException(
 +          "Error setting up TabletManagementIterator for the root tablet", 
e2);
 +    }
  
 -    return new ClosableIterator<>() {
 -      boolean finished = false;
 +    return new ClosableIterator<TabletManagement>() {
  
        @Override
 -      public boolean hasNext() {
 -        return !finished;
 +      public void close() {
 +        closed.compareAndSet(false, true);
        }
  
        @Override
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/tablets/ConditionCheckerContext.java
index dac390623d,f395c4134a..6eade82ffb
--- 
a/server/base/src/main/java/org/apache/accumulo/server/tablets/ConditionCheckerContext.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/tablets/ConditionCheckerContext.java
@@@ -46,8 -47,8 +47,8 @@@ import org.apache.accumulo.core.iterato
  import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.conf.TableConfiguration;
  import 
org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig;
 +import org.apache.accumulo.server.data.ServerConditionalMutation;
- import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+ import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl;
 -import org.apache.accumulo.tserver.data.ServerConditionalMutation;
  import org.apache.hadoop.io.Text;
  
  public class ConditionCheckerContext {
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
index 62498e5ab2,d109c97d9c..6df4150fc4
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
@@@ -38,28 -40,44 +38,44 @@@ public class ExtCEnv implements Compact
  
    private final CompactionJobHolder jobHolder;
    private final TExternalCompactionJob job;
 -  private final String queueName;
 +  private final String groupName;
  
-   public static class CompactorIterEnv extends TabletIteratorEnvironment {
+   public static class CompactorIterEnv extends SystemIteratorEnvironmentImpl {
+ 
+     private static class Builder extends 
SystemIteratorEnvironmentImpl.Builder {
+ 
 -      private final String queueName;
++      private final String groupName;
+ 
 -      public Builder(ServerContext context, String queueName) {
++      public Builder(ServerContext context, String groupName) {
+         super(context);
 -        this.queueName = queueName;
++        this.groupName = groupName;
+       }
+ 
+       @Override
+       public SystemIteratorEnvironmentImpl build() {
+         return new CompactorIterEnv(this);
+       }
+ 
+     }
  
 -    private final String queueName;
 +    private final String groupName;
  
-     public CompactorIterEnv(ServerContext context, IteratorScope scope, 
boolean fullMajC,
-         AccumuloConfiguration tableConfig, TableId tableId, CompactionKind 
kind, String groupName) {
-       super(context, scope, fullMajC, tableConfig, tableId, kind);
-       this.groupName = groupName;
+     public CompactorIterEnv(Builder builder) {
+       super(builder);
 -      this.queueName = builder.queueName;
++      this.groupName = builder.groupName;
      }
  
      @VisibleForTesting
      public String getQueueName() {
 -      return queueName;
 +      return groupName;
      }
+ 
    }
  
 -  ExtCEnv(CompactionJobHolder jobHolder, String queueName) {
 +  ExtCEnv(CompactionJobHolder jobHolder, String groupName) {
      this.jobHolder = jobHolder;
      this.job = jobHolder.getJob();
 -    this.queueName = queueName;
 +    this.groupName = groupName;
    }
  
    @Override
@@@ -75,9 -93,29 +91,19 @@@
    @Override
    public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
        AccumuloConfiguration acuTableConf, TableId tableId) {
-     return new CompactorIterEnv(context, IteratorScope.majc,
-         !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
-         CompactionKind.valueOf(job.getKind().name()), groupName);
+ 
 -    CompactorIterEnv.Builder builder = new CompactorIterEnv.Builder(context, 
queueName);
++    CompactorIterEnv.Builder builder = new CompactorIterEnv.Builder(context, 
groupName);
+     builder.withScope(IteratorScope.majc).withTableId(tableId);
+ 
+     if (CompactionKind.valueOf(job.getKind().name()) == CompactionKind.USER) {
+       builder.isUserCompaction();
+     }
+ 
+     if (!jobHolder.getJob().isPropagateDeletes()) {
+       builder.isFullMajorCompaction();
+     }
+ 
+     return builder.build();
    }
  
    @Override
diff --cc 
server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
index a33694ad1e,79d58cf6b8..ce729592af
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
@@@ -56,10 -56,9 +56,10 @@@ import org.apache.accumulo.core.sample.
  import org.apache.accumulo.core.sample.impl.SamplerFactory;
  import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
  import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.core.util.cache.Caches;
  import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.conf.TableConfiguration;
- import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+ import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl;
  import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.io.Text;
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
index 03a6697f70,0000000000..1c4624bf38
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
@@@ -1,86 -1,0 +1,88 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +
 +import org.apache.accumulo.compactor.Compactor;
 +import org.apache.accumulo.core.cli.ConfigOpts;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.compaction.thrift.CompactorService;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.metadata.SystemTables;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
- import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
++import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
++import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl;
 +import org.apache.thrift.TException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class MemoryConsumingCompactor extends Compactor {
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryConsumingCompactor.class);
 +
 +  MemoryConsumingCompactor(ConfigOpts opts, String[] args) {
 +    super(opts, args);
 +  }
 +
 +  @Override
 +  protected CompactorService.Iface getCompactorThriftHandlerInterface() {
 +    return this;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
 +      throws TException {
 +    // Use the cancel Thrift RPC to free the consumed memory
 +    LOG.warn("cancel called, freeing memory");
 +    MemoryConsumingIterator.freeBuffers();
 +  }
 +
 +  @Override
 +  public TExternalCompactionJob getRunningCompaction(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException, TException {
 +    // Use the getRunningCompaction Thrift RPC to consume the memory
 +    LOG.warn("getRunningCompaction called, consuming memory");
 +    try {
++      SystemIteratorEnvironment env =
++          (SystemIteratorEnvironment) new 
SystemIteratorEnvironmentImpl.Builder(getContext())
++              
.withScope(IteratorScope.scan).withTableId(SystemTables.METADATA.tableId()).build();
++
 +      MemoryConsumingIterator iter = new MemoryConsumingIterator();
-       iter.init((SortedKeyValueIterator<Key,Value>) null, Map.of(),
-           new TabletIteratorEnvironment(getContext(), IteratorScope.scan,
-               
getContext().getTableConfiguration(SystemTables.METADATA.tableId()),
-               SystemTables.METADATA.tableId()));
++      iter.init((SortedKeyValueIterator<Key,Value>) null, Map.of(), env);
 +      iter.consume();
 +    } catch (IOException e) {
 +      throw new TException("Error consuming memory", e);
 +    }
 +    return new TExternalCompactionJob();
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    try (var compactor = new MemoryConsumingCompactor(new ConfigOpts(), 
args)) {
 +      compactor.runServer();
 +    }
 +  }
 +
 +}

Reply via email to