This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 094c9b09d5 Add tests for CompactionDriver cleanupTabletMetadata (#4798)
094c9b09d5 is described below
commit 094c9b09d5c0004c833a8b0de289b6f824206790
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Aug 12 08:09:51 2024 -0400
Add tests for CompactionDriver cleanupTabletMetadata (#4798)
This commit adds tests using TestAmple to verify that the
cleanupTabletMetadata method, which is called from undo, properly cleans
up any compaction metadata for a given fate Id across the tablets that
match the range provided to the CompactionDriver
This closes #4795
---
.../manager/tableOps/compact/CompactionDriver.java | 2 +-
.../compaction/ExternalCompactionTestUtils.java | 27 +++--
.../apache/accumulo/test/fate/ManagerRepoIT.java | 134 +++++++++++++++++++++
3 files changed, 152 insertions(+), 11 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 74fd66b49d..990b1a0d74 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-class CompactionDriver extends ManagerRepo {
+public class CompactionDriver extends ManagerRepo {
private static final Logger log =
LoggerFactory.getLogger(CompactionDriver.class);
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 20e6efc499..256751b036 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -370,18 +370,25 @@ public class ExternalCompactionTestUtils {
public static void assertNoCompactionMetadata(ServerContext ctx, String
tableName) {
var tableId =
TableId.of(ctx.tableOperations().tableIdMap().get(tableName));
try (var tabletsMetadata =
ctx.getAmple().readTablets().forTable(tableId).build()) {
+ assertNoCompactionMetadata(tabletsMetadata);
+ }
+ }
- int count = 0;
+ public static void assertNoCompactionMetadata(TabletsMetadata
tabletsMetadata) {
+ int count = 0;
- for (var tabletMetadata : tabletsMetadata) {
- assertEquals(Set.of(), tabletMetadata.getCompacted());
- assertNull(tabletMetadata.getSelectedFiles());
- assertEquals(Set.of(),
tabletMetadata.getExternalCompactions().keySet());
- assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested());
- count++;
- }
-
- assertTrue(count > 0);
+ for (var tabletMetadata : tabletsMetadata) {
+ assertNoCompactionMetadata(tabletMetadata);
+ count++;
}
+
+ assertTrue(count > 0);
+ }
+
+ public static void assertNoCompactionMetadata(TabletMetadata tabletMetadata)
{
+ assertEquals(Set.of(), tabletMetadata.getCompacted());
+ assertNull(tabletMetadata.getSelectedFiles());
+ assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet());
+ assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
index 50ee816e86..a0cf45ee79 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
@@ -23,6 +23,7 @@ import static
org.apache.accumulo.core.client.ConditionalWriter.Status.UNKNOWN;
import static org.apache.accumulo.test.ample.TestAmpleUtil.mockWithAmple;
import static
org.apache.accumulo.test.ample.metadata.ConditionalWriterInterceptor.withStatus;
import static org.apache.accumulo.test.ample.metadata.TestAmple.not;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -31,10 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.BatchWriter;
@@ -43,19 +49,28 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
import
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily;
+import org.apache.accumulo.core.metadata.schema.SelectedFiles;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.manager.tableOps.compact.CompactionDriver;
import org.apache.accumulo.manager.tableOps.merge.DeleteRows;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
@@ -74,6 +89,9 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.collect.Sets;
public class ManagerRepoIT extends SharedMiniClusterBase {
@@ -307,6 +325,122 @@ public class ManagerRepoIT extends SharedMiniClusterBase {
}
}
+ @ParameterizedTest
+ @MethodSource("compactionDriverRanges")
+ public void testCompactionDriverCleanup(Pair<Text,Text> rangeText) throws
Exception {
+ // Create a range for the test and generate table names
+ String[] tableNames = getUniqueNames(2);
+ var range = new Range(rangeText.getFirst(), true, rangeText.getSecond(),
false);
+ var tableSuffix = (range.isInfiniteStartKey() ? "" :
rangeText.getFirst().toString())
+ + (range.isInfiniteStopKey() ? "" : rangeText.getSecond().toString());
+ String metadataTable = tableNames[0] + tableSuffix;
+ String userTable = tableNames[1] + tableSuffix;
+
+ try (ClientContext client =
+ (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+
+ // Create a table with 4 splits
+ var splits = Sets.newTreeSet(Arrays.asList(new Text("d"), new Text("m"),
new Text("s")));
+ client.tableOperations().create(userTable, new
NewTableConfiguration().withSplits(splits));
+ TableId tableId =
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+ // Set up Test ample and manager
+ TestAmple.createMetadataTable(client, metadataTable);
+ TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+ .create(getCluster().getServerContext(), Map.of(DataLevel.USER,
metadataTable));
+ testAmple.createMetadataFromExisting(client, tableId);
+ Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
+ var ctx = manager.getContext();
+
+ // Create the CompactionDriver to test with the given range passed into
the method
+ final ManagerRepo repo = new
CompactionDriver(ctx.getNamespaceId(tableId), tableId,
+ !range.isInfiniteStartKey() ?
range.getStartKey().getRow().getBytes() : null,
+ !range.isInfiniteStopKey() ? range.getEndKey().getRow().getBytes() :
null);
+
+ // Create a couple random fateIds and generate compaction metadata for
+ // the first fateId for all 4 tablets in the table
+ var fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+ var fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+ createCompactionMetadata(testAmple, tableId, fateId1);
+
+ // Verify there are 4 tablets and each tablet has compaction metadata
generated
+ // for the first fateId and store all the extents in a set
+ Set<KeyExtent> extents = new HashSet<>();
+ try (TabletsMetadata tabletsMetadata =
testAmple.readTablets().forTable(tableId).build()) {
+ assertEquals(4, tabletsMetadata.stream().count());
+ tabletsMetadata.forEach(tm -> {
+ extents.add(tm.getExtent());
+ assertHasCompactionMetadata(fateId1, tm);
+ });
+ }
+ assertEquals(4, extents.size());
+
+ // First call undo using the second fateId and verify there's still
metadata for the first one
+ repo.undo(fateId2, manager);
+ try (TabletsMetadata tabletsMetadata =
testAmple.readTablets().forTable(tableId).build()) {
+ tabletsMetadata.forEach(tm -> {
+ assertHasCompactionMetadata(fateId1, tm);
+ });
+ }
+
+ // Now call undo on the first fateId which would clean up all the
metadata for all the
+ // tablets that overlap with the given range that was provided to the
CompactionDriver
+ // during the creation of the repo
+ repo.undo(fateId1, manager);
+
+ // First, iterate over only the overlapping tablets and verify that
those tablets
+ // were cleaned up and remove any visited tablets from the extents set
+ try (var tabletsMetadata = testAmple.readTablets().forTable(tableId)
+ .overlapping(rangeText.getFirst(), rangeText.getSecond()).build()) {
+ tabletsMetadata.forEach(tm -> {
+ extents.remove(tm.getExtent());
+ assertNoCompactionMetadata(tm);
+ });
+ }
+
+ // Second, for any remaining tablets that did not overlap the range
provided,
+ // verify that metadata still exists as the CompactionDriver should not
have cleaned
+ // up tablets that did not overlap the given range
+ try (var tabletsMetadata =
+ testAmple.readTablets().forTablets(extents,
Optional.empty()).build()) {
+ tabletsMetadata.forEach(tm -> {
+ extents.remove(tm.getExtent());
+ assertHasCompactionMetadata(fateId1, tm);
+ });
+ }
+
+ // Verify all the tablets in the table were checked for correct metadata
after undo
+ assertTrue(extents.isEmpty());
+ }
+ }
+
+ private void createCompactionMetadata(Ample testAmple, TableId tableId,
FateId fateId) {
+ var stf1 = StoredTabletFile.of(new org.apache.hadoop.fs.Path(
+
"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000050.rf"));
+ var stf2 = StoredTabletFile.of(new org.apache.hadoop.fs.Path(
+
"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
+
+ try (TabletsMetadata tabletsMetadata =
testAmple.readTablets().forTable(tableId).build();
+ TabletsMutator tabletsMutator = testAmple.mutateTablets()) {
+ var selectedFiles = new SelectedFiles(Set.of(stf1, stf2), true, fateId,
+ SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+ tabletsMetadata.forEach(tm ->
tabletsMutator.mutateTablet(tm.getExtent()).putCompacted(fateId)
+
.putUserCompactionRequested(fateId).putSelectedFiles(selectedFiles).mutate());
+ }
+ }
+
+ private void assertHasCompactionMetadata(FateId fateId, TabletMetadata tm) {
+ assertEquals(Set.of(fateId), tm.getCompacted());
+ assertNotNull(tm.getSelectedFiles());
+ assertEquals(Set.of(fateId), tm.getUserCompactionsRequested());
+ }
+
+ // Create a few ranges to test the CompactionDriver cleanup against
+ private static Stream<Pair<Text,Text>> compactionDriverRanges() {
+ return Stream.of(new Pair<>(null, null), new Pair<>(null, new Text("d")),
+ new Pair<>(new Text("dd"), new Text("mm")), new Pair<>(new Text("s"),
null));
+ }
+
private void createUnsplittableTable(ClientContext client, String table)
throws Exception {
// make a table and lower the configuration properties
// @formatter:off