This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 88b108e9ac Narrow the files checked by compaction commit (#5153)
88b108e9ac is described below
commit 88b108e9ac8fe00b231d53cf236eb794b448d922
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Dec 9 08:11:22 2024 -0500
Narrow the files checked by compaction commit (#5153)
Previously the conditional mutation to commit a compaction would require
all files in the tablet be the same as read earlier and on a busy tablet
this could fail and retry often. The check has now been narrowed to only
verify that the files involved with the compaction still exist. A new
method was added to Ample called requireFiles(Set<StoredTabletFile> files)
which creates a condition for each file column to verify each one
exists.
This closes #5117
---
.../accumulo/core/metadata/schema/Ample.java | 6 +++
.../metadata/ConditionalTabletMutatorImpl.java | 14 ++++++
.../coordinator/commit/CommitCompaction.java | 5 ++-
.../coordinator/commit/CompactionCommitData.java | 5 +--
.../test/functional/AmpleConditionalWriterIT.java | 51 ++++++++++++++++++++++
5 files changed, 76 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 888775aed5..9528ec9974 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.metadata.schema;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -514,6 +515,11 @@ public interface Ample {
ConditionalTabletMutator requireAbsentLogs();
+ /**
+ * Require that a tablet contain all the files in the set
+ */
+ ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);
+
/**
* <p>
* Ample provides the following features on top of the conditional writer
to help automate
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 9978dbbe7c..da8e050504 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -45,7 +45,9 @@ import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.SortedFilesIterator;
import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
@@ -330,6 +332,18 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
return this;
}
+ @Override
+ public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
+ Preconditions.checkState(updatesEnabled, "Cannot make updates after
calling mutate.");
+ IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO,
PresentIterator.class);
+ for (StoredTabletFile file : files) {
+ Condition c = new Condition(DataFileColumnFamily.STR_NAME,
file.getMetadata())
+ .setValue(PresentIterator.VALUE).setIterators(is);
+ mutation.addCondition(c);
+ }
+ return this;
+ }
+
@Override
public void submit(Ample.RejectionHandler rejectionCheck) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after
calling mutate.");
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 0daa221f1f..cb9492d7e3 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -110,7 +110,7 @@ public class CommitCompaction extends ManagerRepo {
while (canCommitCompaction(ecid, tablet)) {
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
- // the compacted files should not exists in the tablet already
+ // the compacted files should not exist in the tablet already
var tablet2 = tablet;
newDatafile.ifPresent(
newFile ->
Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
@@ -118,7 +118,8 @@ public class CommitCompaction extends ManagerRepo {
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator =
tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
- .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
+ .requireCompaction(ecid).requireSame(tablet, LOCATION)
+ .requireFiles(commitData.getJobFiles());
if (ecm.getKind() == CompactionKind.USER) {
tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
index 23b293c25e..0e7587d633 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.manager.compaction.coordinator.commit;
import java.io.Serializable;
-import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
@@ -56,7 +55,7 @@ public class CompactionCommitData implements Serializable {
return KeyExtent.fromThrift(textent).tableId();
}
- public Collection<StoredTabletFile> getJobFiles() {
- return
inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList());
+ public Set<StoredTabletFile> getJobFiles() {
+ return
inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toSet());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index a343b94854..ca0baf7435 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -1746,4 +1746,55 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
}
}
}
+
+ @Test
+ public void testRequiresFiles() {
+ var context = cluster.getServerContext();
+
+ var stf1 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
+ var stf2 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
+ var stf3 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
+ var stf4 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
+ var dfv = new DataFileValue(100, 100);
+
+ // Add 3 of the files, skip the 4th file
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1,
dfv).putFile(stf2, dfv)
+ .putFile(stf3, dfv).submit(tm -> false);
+ assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+ }
+ assertEquals(Set.of(stf1, stf2, stf3),
context.getAmple().readTablet(e1).getFiles());
+
+ // Test mutation is accepted when given all files
+ var time1 = MetadataTime.parse("L50");
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1,
stf2, stf3))
+ .putTime(time1).submit(tm -> false);
+ assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+ }
+ assertEquals(time1, context.getAmple().readTablet(e1).getTime());
+
+ // Test mutation is accepted when a subset of files is given
+ var time2 = MetadataTime.parse("L60");
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1,
stf3)).putTime(time2)
+ .submit(tm -> false);
+ assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+ }
+ assertEquals(time2, context.getAmple().readTablet(e1).getTime());
+
+ // Test mutation is rejected when a file is given that the tablet does not
have
+ var time3 = MetadataTime.parse("L60");
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1,
stf4)).putTime(time3)
+ .submit(tm -> false);
+ assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+ }
+ // Should be previous time still as the mutation was rejected
+ assertEquals(time2, context.getAmple().readTablet(e1).getTime());
+ }
}