This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 52de3d5637411d64b2d7dcd3040d5c798a16f073 Merge: 7e843611d2 b3aed75b87 Author: Dave Marion <[email protected]> AuthorDate: Thu Jun 5 13:24:59 2025 +0000 Merge branch '2.1' .../iteratorsImpl/ClientIteratorEnvironment.java | 23 +- .../iterators/SystemIteratorEnvironmentImpl.java | 131 +++++++++++ .../iterators/TabletIteratorEnvironment.java | 241 --------------------- .../manager/state/TabletManagementIterator.java | 4 +- .../server/manager/state/ZooTabletStateStore.java | 9 +- .../server/tablets/ConditionCheckerContext.java | 13 +- .../org/apache/accumulo/compactor/ExtCEnv.java | 44 +++- .../apache/accumulo/tserver/tablet/MinCEnv.java | 8 +- .../accumulo/tserver/tablet/ScanDataSource.java | 15 +- .../apache/accumulo/tserver/InMemoryMapTest.java | 99 ++++----- .../test/functional/MemoryConsumingCompactor.java | 12 +- .../test/performance/scan/CollectTabletStats.java | 7 +- 12 files changed, 273 insertions(+), 333 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/iteratorsImpl/ClientIteratorEnvironment.java index 417a38ffcd,a4778441ea..e593ea8f41 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/ClientIteratorEnvironment.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/ClientIteratorEnvironment.java @@@ -28,10 -29,14 +28,11 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.security.Authorizations; + import org.apache.accumulo.core.spi.common.ServiceEnvironment; public class ClientIteratorEnvironment implements IteratorEnvironment { diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index e542a3a2ea,0000000000..ca5b0bafbc mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@@ -1,345 -1,0 +1,345 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.PluginEnvironment.Configuration; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; +import org.apache.accumulo.core.metadata.TabletState; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; +import org.apache.accumulo.core.spi.balancer.DoNothingBalancer; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.fs.VolumeUtil; - import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; ++import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; +import org.apache.accumulo.server.split.SplitUtils; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iterator used by the TabletGroupWatcher threads in the Manager. This iterator returns + * TabletManagement objects for each Tablet that needs some type of action performed on it by the + * Manager. + */ +public class TabletManagementIterator extends WholeRowIterator { + private static final Logger LOG = LoggerFactory.getLogger(TabletManagementIterator.class); + public static final String TABLET_GOAL_STATE_PARAMS_OPTION = "tgsParams"; + private CompactionJobGenerator compactionGenerator; + private TabletBalancer balancer; + private final SplitConfig splitConfig = new SplitConfig(); + + private static class SplitConfig { + TableId tableId; + long splitThreshold; + long maxEndRowSize; + int maxFilesToOpen; + + void update(TableId tableId, Configuration tableConfig) { + if (!tableId.equals(this.tableId)) { + this.tableId = tableId; + splitThreshold = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + maxEndRowSize = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey())); + maxFilesToOpen = (int) ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey())); + } + } + } + + private static boolean shouldReturnDueToSplit(final TabletMetadata tm, + final Configuration tableConfig, SplitConfig splitConfig) { + + // Should see the same table many times in a row, so this should only read config the first time + // seen. Reading the config for each tablet was showing up as performance problem when profiling + // SplitMillionIT that reads one million tablets. It is also nice to have snapshot of config + // that is used for all tablet in a table. + splitConfig.update(tm.getTableId(), tableConfig); + + // If the current computed metadata matches the current marker then we can't split, + // so we return false. If the marker is set but doesn't match then return true + // which gives a chance to clean up the marker and recheck. + var unsplittable = tm.getUnSplittable(); + if (unsplittable != null) { + return !unsplittable + .equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), splitConfig.splitThreshold, + splitConfig.maxEndRowSize, splitConfig.maxFilesToOpen, tm.getFiles())); + } + + // If unsplittable is not set at all then check if over split threshold + final boolean shouldSplit = SplitUtils.needsSplit(splitConfig.splitThreshold, tm); + LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), + tm.getFileSize(), splitConfig.splitThreshold, shouldSplit); + return shouldSplit; + } + + private boolean shouldReturnDueToLocation(final TabletMetadata tm) { + + if (tm.getExtent().isRootTablet()) { + return true; + } + + if (tm.getMigration() != null) { + // Ideally only the state and goalState would need to be used to determine if a tablet should + // be returned. However, the Manager/TGW currently needs everything currently migrating + // returned so it can update the migrations. If this were improved then this case would not + // be needed. + return true; + } + + TabletState state = TabletState.compute(tm, tabletMgmtParams.getOnlineTsevers()); + TabletGoalState goalState = TabletGoalState.compute(tm, state, balancer, tabletMgmtParams); + if (LOG.isTraceEnabled()) { + LOG.trace( + "extent:{} state:{} goalState:{} tabletAvailability:{}, hostingRequested: {}, opId: {}", + tm.getExtent(), state, goalState, tm.getTabletAvailability(), tm.getHostingRequested(), + tm.getOperationId()); + } + + switch (goalState) { + case HOSTED: + return state != TabletState.HOSTED; + case SUSPENDED: + return state != TabletState.SUSPENDED; + case UNASSIGNED: + return state != TabletState.UNASSIGNED; + default: + throw new IllegalStateException("unknown goal state " + goalState); + } + } + + public static void configureScanner(final ScannerBase scanner, + final TabletManagementParameters tabletMgmtParams) { + // Note : if the scanner is ever made to fetch columns, then TabletManagement.CONFIGURED_COLUMNS + // must be updated + IteratorSetting tabletChange = + new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); + tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, tabletMgmtParams.serialize()); + scanner.addScanIterator(tabletChange); + } + + public static TabletManagement decode(Entry<Key,Value> e) throws IOException { + return new TabletManagement(e.getKey(), e.getValue()); + } + + private IteratorEnvironment env; + + private TabletManagementParameters tabletMgmtParams = null; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + this.env = env; + tabletMgmtParams = + TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION)); + compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(), + tabletMgmtParams.getCompactionHints(), tabletMgmtParams.getSteadyTime()); + final AccumuloConfiguration conf = new ConfigurationCopy(env.getPluginEnv().getConfiguration()); + BalancerEnvironmentImpl benv = - new BalancerEnvironmentImpl(((TabletIteratorEnvironment) env).getServerContext()); ++ new BalancerEnvironmentImpl(((SystemIteratorEnvironment) env).getServerContext()); + try { + balancer = Property.createInstanceFromPropertyName(conf, Property.MANAGER_TABLET_BALANCER, + TabletBalancer.class, new DoNothingBalancer()); + balancer.init(benv); + } catch (Exception e) { + LOG.warn("Failed to create balancer falling back to {}", DoNothingBalancer.class, e); + balancer = new DoNothingBalancer(); + balancer.init(benv); + } + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) + throws IOException { + if (range != null) { + // This iterator sits on top of the WholeRowIterator (see configureScanner), so enforce + // that the start and end keys in the Range only have a row component to the key. + for (Key k : new Key[] {range.getStartKey(), range.getEndKey()}) { + if (k != null && k.compareTo(new Key(k.getRow())) != 0) { + throw new IllegalArgumentException( + "TabletManagementIterator must be seeked with keys that only contain a row, supplied range: " + + range); + } + } + } + super.seek(range, columnFamilies, inclusive); + } + + @Override + protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { + + var keyIter = keys.listIterator(); + var kvIter = new Iterator<Map.Entry<Key,Value>>() { + @Override + public boolean hasNext() { + return keyIter.hasNext(); + } + + @Override + public Entry<Key,Value> next() { + var valueIdx = keyIter.nextIndex(); + var key = keyIter.next(); + return new AbstractMap.SimpleImmutableEntry<>(key, values.get(valueIdx)); + } + }; + + final Set<ManagementAction> actions = new HashSet<>(); + final TabletMetadata tm = + TabletMetadata.convertRow(kvIter, TabletManagement.CONFIGURED_COLUMNS, false, true); + + Exception error = null; + try { + LOG.trace("Evaluating extent: {}", tm); + computeTabletManagementActions(tm, actions); + } catch (Exception e) { + LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e); + error = e; + } + + if (!actions.isEmpty() || error != null) { + if (error != null) { + // Insert the error into K,V pair representing + // the tablet metadata. + TabletManagement.addError((k, v) -> { + keys.add(k); + values.add(v); + }, currentRow, error); + } else if (!actions.isEmpty()) { + // If we simply returned here, then the client would get the encoded K,V + // from the WholeRowIterator. However, it would not know the reason(s) why + // it was returned. Insert a K,V pair to represent the reasons. The client + // can pull this K,V pair from the results by looking at the colf. + TabletManagement.addActions((k, v) -> { + keys.add(k); + values.add(v); + }, currentRow, actions); + } + + // This key is being created exactly the same way as the whole row iterator creates keys. + // This is important for ensuring that seek works as expected in the continue case. See + // WholeRowIterator seek function for details, it looks for keys w/o columns. + LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(), actions); + return true; + } + + LOG.trace("No reason to return extent {}, continuing", tm.getExtent()); + return false; + } + + private static final Set<ManagementAction> REASONS_NOT_TO_SPLIT_OR_COMPACT = + Collections.unmodifiableSet(EnumSet.of(ManagementAction.BAD_STATE, + ManagementAction.NEEDS_VOLUME_REPLACEMENT, ManagementAction.NEEDS_RECOVERY)); + + /** + * Evaluates whether or not this Tablet should be returned so that it can be acted upon by the + * Manager + */ + private void computeTabletManagementActions(final TabletMetadata tm, + final Set<ManagementAction> reasonsToReturnThisTablet) { + + if (tm.isFutureAndCurrentLocationSet()) { + // no need to check everything, we are in a known state where we want to return everything. + reasonsToReturnThisTablet.add(ManagementAction.BAD_STATE); + } + + if (!tm.getLogs().isEmpty() && (tm.getOperationId() == null + || tm.getOperationId().getType() != TabletOperationType.DELETING)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_RECOVERY); + } + + if (VolumeUtil.needsVolumeReplacement(tabletMgmtParams.getVolumeReplacements(), tm)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT); + } + + if (shouldReturnDueToLocation(tm)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE); + } + + if (tm.getOperationId() == null && tabletMgmtParams.isTableOnline(tm.getTableId()) + && Collections.disjoint(REASONS_NOT_TO_SPLIT_OR_COMPACT, reasonsToReturnThisTablet)) { + try { + if (shouldReturnDueToSplit(tm, this.env.getPluginEnv().getConfiguration(tm.getTableId()), + splitConfig)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); + } + // important to call this since reasonsToReturnThisTablet is passed to it + if (!compactionGenerator + .generateJobs(tm, determineCompactionKinds(reasonsToReturnThisTablet)).isEmpty()) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING); + } + } catch (NullPointerException e) { + LOG.info( + "Unable to determine if tablet {} should split or compact, maybe table was deleted?", + tm.getExtent()); + } + } + } + + private static final Set<CompactionKind> ALL_COMPACTION_KINDS = + Collections.unmodifiableSet(EnumSet.allOf(CompactionKind.class)); + private static final Set<CompactionKind> SPLIT_COMPACTION_KINDS; + + static { + var tmp = EnumSet.allOf(CompactionKind.class); + tmp.remove(CompactionKind.SYSTEM); + SPLIT_COMPACTION_KINDS = Collections.unmodifiableSet(tmp); + } + + public static Set<CompactionKind> + determineCompactionKinds(Set<ManagementAction> reasonsToReturnThisTablet) { + if (reasonsToReturnThisTablet.contains(ManagementAction.NEEDS_SPLITTING)) { + return SPLIT_COMPACTION_KINDS; + } else { + return ALL_COMPACTION_KINDS; + } + } + +} diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index 671f16dc04,398e67c112..600b96a8fe --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@@ -18,34 -18,25 +18,35 @@@ */ package org.apache.accumulo.server.manager.state; -import java.util.ArrayList; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.AbstractMap; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; - -import org.apache.accumulo.core.clientImpl.ClientContext; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator; +import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency; -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; +import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.server.util.ManagerMetadataUtil; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.server.ServerContext; - import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; ++import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; ++import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -70,32 -60,14 +71,34 @@@ class ZooTabletStateStore extends Abstr } @Override - public ClosableIterator<TabletLocationState> iterator() { + public ClosableIterator<TabletManagement> iterator(List<Range> ranges, + TabletManagementParameters parameters) { + Preconditions.checkArgument(parameters.getLevel() == getLevel()); + - final TabletIteratorEnvironment env = new TabletIteratorEnvironment(ctx, IteratorScope.scan, - ctx.getTableConfiguration(SystemTables.ROOT.tableId()), SystemTables.ROOT.tableId()); ++ final SystemIteratorEnvironment env = ++ (SystemIteratorEnvironment) new SystemIteratorEnvironmentImpl.Builder(ctx) ++ .withScope(IteratorScope.scan).withTableId(SystemTables.ROOT.tableId()).build(); ++ + final TabletManagementIterator tmi = new TabletManagementIterator(); + final AtomicBoolean closed = new AtomicBoolean(false); + + try { + final RootTabletMetadata rtm = RootTabletMetadata.read(ctx); + final SortedMapIterator iter = new SortedMapIterator(rtm.toKeyValues()); + tmi.init(iter, + Map.of(TabletManagementIterator.TABLET_GOAL_STATE_PARAMS_OPTION, parameters.serialize()), + env); + tmi.seek(new Range(), null, true); + } catch (IOException e2) { + throw new IllegalStateException( + "Error setting up TabletManagementIterator for the root tablet", e2); + } - return new ClosableIterator<>() { - boolean finished = false; + return new ClosableIterator<TabletManagement>() { @Override - public boolean hasNext() { - return !finished; + public void close() { + closed.compareAndSet(false, true); } @Override diff --cc server/base/src/main/java/org/apache/accumulo/server/tablets/ConditionCheckerContext.java index dac390623d,f395c4134a..6eade82ffb --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/ConditionCheckerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/ConditionCheckerContext.java @@@ -46,8 -47,8 +47,8 @@@ import org.apache.accumulo.core.iterato import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig; +import org.apache.accumulo.server.data.ServerConditionalMutation; - import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; + import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl; -import org.apache.accumulo.tserver.data.ServerConditionalMutation; import org.apache.hadoop.io.Text; public class ConditionCheckerContext { diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java index 62498e5ab2,d109c97d9c..6df4150fc4 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java @@@ -38,28 -40,44 +38,44 @@@ public class ExtCEnv implements Compact private final CompactionJobHolder jobHolder; private final TExternalCompactionJob job; - private final String queueName; + private final String groupName; - public static class CompactorIterEnv extends TabletIteratorEnvironment { + public static class CompactorIterEnv extends SystemIteratorEnvironmentImpl { + + private static class Builder extends SystemIteratorEnvironmentImpl.Builder { + - private final String queueName; ++ private final String groupName; + - public Builder(ServerContext context, String queueName) { ++ public Builder(ServerContext context, String groupName) { + super(context); - this.queueName = queueName; ++ this.groupName = groupName; + } + + @Override + public SystemIteratorEnvironmentImpl build() { + return new CompactorIterEnv(this); + } + + } - private final String queueName; + private final String groupName; - public CompactorIterEnv(ServerContext context, IteratorScope scope, boolean fullMajC, - AccumuloConfiguration tableConfig, TableId tableId, CompactionKind kind, String groupName) { - super(context, scope, fullMajC, tableConfig, tableId, kind); - this.groupName = groupName; + public CompactorIterEnv(Builder builder) { + super(builder); - this.queueName = builder.queueName; ++ this.groupName = builder.groupName; } @VisibleForTesting public String getQueueName() { - return queueName; + return groupName; } + } - ExtCEnv(CompactionJobHolder jobHolder, String queueName) { + ExtCEnv(CompactionJobHolder jobHolder, String groupName) { this.jobHolder = jobHolder; this.job = jobHolder.getJob(); - this.queueName = queueName; + this.groupName = groupName; } @Override @@@ -75,9 -93,29 +91,19 @@@ @Override public SystemIteratorEnvironment createIteratorEnv(ServerContext context, AccumuloConfiguration acuTableConf, TableId tableId) { - return new CompactorIterEnv(context, IteratorScope.majc, - !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId, - CompactionKind.valueOf(job.getKind().name()), groupName); + - CompactorIterEnv.Builder builder = new CompactorIterEnv.Builder(context, queueName); ++ CompactorIterEnv.Builder builder = new CompactorIterEnv.Builder(context, groupName); + builder.withScope(IteratorScope.majc).withTableId(tableId); + + if (CompactionKind.valueOf(job.getKind().name()) == CompactionKind.USER) { + builder.isUserCompaction(); + } + + if (!jobHolder.getJob().isPropagateDeletes()) { + builder.isFullMajorCompaction(); + } + + return builder.build(); } @Override diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java index a33694ad1e,79d58cf6b8..ce729592af --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java @@@ -56,10 -56,9 +56,10 @@@ import org.apache.accumulo.core.sample. import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; - import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; + import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl; import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java index 03a6697f70,0000000000..1c4624bf38 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java @@@ -1,86 -1,0 +1,88 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.CompactorService; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; - import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; ++import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; ++import org.apache.accumulo.server.iterators.SystemIteratorEnvironmentImpl; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemoryConsumingCompactor extends Compactor { + + private static final Logger LOG = LoggerFactory.getLogger(MemoryConsumingCompactor.class); + + MemoryConsumingCompactor(ConfigOpts opts, String[] args) { + super(opts, args); + } + + @Override + protected CompactorService.Iface getCompactorThriftHandlerInterface() { + return this; + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + // Use the cancel Thrift RPC to free the consumed memory + LOG.warn("cancel called, freeing memory"); + MemoryConsumingIterator.freeBuffers(); + } + + @Override + public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException, TException { + // Use the getRunningCompaction Thrift RPC to consume the memory + LOG.warn("getRunningCompaction called, consuming memory"); + try { ++ SystemIteratorEnvironment env = ++ (SystemIteratorEnvironment) new SystemIteratorEnvironmentImpl.Builder(getContext()) ++ .withScope(IteratorScope.scan).withTableId(SystemTables.METADATA.tableId()).build(); ++ + MemoryConsumingIterator iter = new MemoryConsumingIterator(); - iter.init((SortedKeyValueIterator<Key,Value>) null, Map.of(), - new TabletIteratorEnvironment(getContext(), IteratorScope.scan, - getContext().getTableConfiguration(SystemTables.METADATA.tableId()), - SystemTables.METADATA.tableId())); ++ iter.init((SortedKeyValueIterator<Key,Value>) null, Map.of(), env); + iter.consume(); + } catch (IOException e) { + throw new TException("Error consuming memory", e); + } + return new TExternalCompactionJob(); + } + + public static void main(String[] args) throws Exception { + try (var compactor = new MemoryConsumingCompactor(new ConfigOpts(), args)) { + compactor.runServer(); + } + } + +}
