This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 802b4d689c narrows check of loaded files in conditional mutation
(#5166)
802b4d689c is described below
commit 802b4d689c32db84edd5370f9ea836d7b9e7c1a8
Author: Keith Turner <[email protected]>
AuthorDate: Thu Dec 12 13:25:31 2024 -0500
narrows check of loaded files in conditional mutation (#5166)
Bulk load fate code was reading a tablets loaded flags, checking it was
not in them, and then making a conditional mutation that required the
set of bulk flags to to be the same. Requiring the set to be the same
caused unnecessary collisions between bulk imports. Modified the
conditional check to require the loaded flags for the fate operation
to be absent.
---
.../accumulo/core/metadata/schema/Ample.java | 5 ++
.../metadata/ConditionalTabletMutatorImpl.java | 11 ++++
.../manager/tableOps/bulkVer2/LoadFiles.java | 4 +-
.../test/functional/AmpleConditionalWriterIT.java | 61 ++++++++++++++++++++++
4 files changed, 79 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 699ee319a1..b29b879d89 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -525,6 +525,11 @@ public interface Ample {
*/
ConditionalTabletMutator requireLessOrEqualsFiles(long limit);
+ /**
+ * Requires that a tablet not have these loaded flags set.
+ */
+ ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile>
files);
+
/**
* <p>
* Ample provides the following features on top of the conditional writer
to help automate
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 8cee60d96d..ff54603a82 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.SortedFilesIterator;
import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
@@ -353,6 +354,16 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
return this;
}
+ @Override
+ public ConditionalTabletMutator
requireAbsentLoaded(Set<ReferencedTabletFile> files) {
+ Preconditions.checkState(updatesEnabled, "Cannot make updates after
calling mutate.");
+ for (ReferencedTabletFile file : files) {
+ Condition c = new Condition(BulkFileColumnFamily.STR_NAME,
file.insert().getMetadata());
+ mutation.addCondition(c);
+ }
+ return this;
+ }
+
@Override
public void submit(Ample.RejectionHandler rejectionCheck) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after
calling mutate.");
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 7b0f494c19..5ff2ca3ec3 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -174,7 +174,6 @@ class LoadFiles extends ManagerRepo {
}
List<ColumnType> rsc = new ArrayList<>();
- rsc.add(LOCATION);
if (setTime) {
rsc.add(TIME);
}
@@ -232,7 +231,8 @@ class LoadFiles extends ManagerRepo {
}
var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
- .requireAbsentOperation().requireSame(tablet, LOADED,
requireSameCols);
+ .requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet())
+ .requireSame(tablet, LOCATION, requireSameCols);
if (pauseLimit > 0) {
tabletMutator.requireLessOrEqualsFiles(pauseLimit);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index c1bc46921a..2cbb819a05 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -1874,4 +1874,65 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
assertEquals(time4, context.getAmple().readTablet(e1).getTime());
}
+
+ @Test
+ public void testRequireAbsentLoaded() {
+ var context = cluster.getServerContext();
+
+ var stf1 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
+ var stf2 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
+ var stf3 = StoredTabletFile
+ .of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
+ var dfv = new DataFileValue(100, 100);
+
+ FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation()
+ .requireAbsentLoaded(Set.of(stf1.getTabletFile(),
stf2.getTabletFile()))
+ .putBulkFile(stf1.getTabletFile(),
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
+ .putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false);
+ assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+ }
+ assertEquals(Set.of(stf1, stf2),
context.getAmple().readTablet(e1).getFiles());
+ assertEquals(Map.of(stf1, fateId1, stf2, fateId1),
+ context.getAmple().readTablet(e1).getLoaded());
+
+ FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation()
+ .requireAbsentLoaded(Set.of(stf3.getTabletFile()))
+ .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3,
dfv).submit(tm -> false);
+ assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+ }
+ assertEquals(Set.of(stf1, stf2, stf3),
context.getAmple().readTablet(e1).getFiles());
+ assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
+ context.getAmple().readTablet(e1).getLoaded());
+
+ // should fail because the loaded markers are present
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation()
+ .requireAbsentLoaded(Set.of(stf1.getTabletFile(),
stf2.getTabletFile()))
+ .putBulkFile(stf1.getTabletFile(),
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
+ .putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm ->
false);
+ assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+ }
+
+ // should fail because the loaded markers are present
+ try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+ ctmi.mutateTablet(e1).requireAbsentOperation()
+ .requireAbsentLoaded(Set.of(stf3.getTabletFile()))
+ .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3,
dfv).putFlushId(99)
+ .submit(tm -> false);
+ assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+ }
+
+ assertEquals(Set.of(stf1, stf2, stf3),
context.getAmple().readTablet(e1).getFiles());
+ assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
+ context.getAmple().readTablet(e1).getLoaded());
+ assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
+ }
}