This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new edda158a54 Modified SetEncodingIterator to include Value (#4486)
edda158a54 is described below
commit edda158a54b1769066e8d9c2dfe97fecee419dd2
Author: Dave Marion <[email protected]>
AuthorDate: Fri May 10 09:21:24 2024 -0400
Modified SetEncodingIterator to include Value (#4486)
Renamed SetEqualityIterator to SetEncodingIterator. Added a
mandatory iterator option to determine whether the Value
should also be encoded for the equality checks that occur
for the Conditional mutations.
Fixes #3522
Co-authored-by: Christopher L. Shannon <[email protected]>
Co-authored-by: Dom G. <[email protected]>
---
.../metadata/ConditionalTabletMutatorImpl.java | 21 +++---
...alityIterator.java => SetEncodingIterator.java} | 75 +++++++++++++++++++---
...ratorTest.java => SetEncodingIteratorTest.java} | 69 +++++++++++++-------
.../test/functional/AmpleConditionalWriterIT.java | 18 +++++-
4 files changed, 142 insertions(+), 41 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 3e87241c6d..381b3e112e 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -55,10 +55,11 @@ import
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMutatorBase;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.iterators.LocationExistsIterator;
import org.apache.accumulo.server.metadata.iterators.PresentIterator;
-import org.apache.accumulo.server.metadata.iterators.SetEqualityIterator;
+import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator;
import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator;
import com.google.common.base.Preconditions;
@@ -172,16 +173,18 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
case PREV_ROW:
throw new IllegalStateException("PREV_ROW already set from Extent");
case LOGS: {
- Condition c = SetEqualityIterator.createCondition(new
HashSet<>(tabletMetadata.getLogs()),
+ Condition c = SetEncodingIterator.createCondition(new
HashSet<>(tabletMetadata.getLogs()),
logEntry ->
logEntry.getColumnQualifier().toString().getBytes(UTF_8),
LogColumnFamily.NAME);
mutation.addCondition(c);
}
break;
case FILES: {
- // ELASTICITY_TODO compare values?
- Condition c =
SetEqualityIterator.createCondition(tabletMetadata.getFiles(),
- stf -> stf.getMetadata().getBytes(UTF_8),
DataFileColumnFamily.NAME);
+ Condition c =
+
SetEncodingIterator.createConditionWithVal(tabletMetadata.getFilesMap().entrySet(),
+ entry -> new
Pair<>(entry.getKey().getMetadata().getBytes(UTF_8),
+ entry.getValue().encode()),
+ DataFileColumnFamily.NAME);
mutation.addCondition(c);
}
break;
@@ -199,7 +202,7 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
break;
case ECOMP: {
Condition c =
-
SetEqualityIterator.createCondition(tabletMetadata.getExternalCompactions().keySet(),
+
SetEncodingIterator.createCondition(tabletMetadata.getExternalCompactions().keySet(),
ecid -> ecid.canonical().getBytes(UTF_8),
ExternalCompactionColumnFamily.NAME);
mutation.addCondition(c);
}
@@ -212,13 +215,13 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
}
break;
case LOADED: {
- Condition c =
SetEqualityIterator.createCondition(tabletMetadata.getLoaded().keySet(),
+ Condition c =
SetEncodingIterator.createCondition(tabletMetadata.getLoaded().keySet(),
stf -> stf.getMetadata().getBytes(UTF_8),
BulkFileColumnFamily.NAME);
mutation.addCondition(c);
}
break;
case COMPACTED: {
- Condition c =
SetEqualityIterator.createCondition(tabletMetadata.getCompacted(),
+ Condition c =
SetEncodingIterator.createCondition(tabletMetadata.getCompacted(),
fTid -> fTid.canonical().getBytes(UTF_8),
CompactedColumnFamily.NAME);
mutation.addCondition(c);
}
@@ -241,7 +244,7 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
break;
case USER_COMPACTION_REQUESTED: {
Condition c =
-
SetEqualityIterator.createCondition(tabletMetadata.getUserCompactionsRequested(),
+
SetEncodingIterator.createCondition(tabletMetadata.getUserCompactionsRequested(),
fTid -> fTid.canonical().getBytes(UTF_8),
UserCompactionRequestedColumnFamily.NAME);
mutation.addCondition(c);
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
similarity index 62%
rename from
server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java
rename to
server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
index c5314b4467..af878263d3 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.server.metadata.iterators;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -38,23 +40,36 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
import org.apache.hadoop.io.Text;
import com.google.common.base.Preconditions;
/**
- * This iterator exists to enable checking for set equality in a conditional
mutation. It allows
- * comparing a set in a client process to a set encoded in column qualifiers
within a tablet.
+ * This iterator exists to enable checking for set equality in a conditional
mutation. The
+ * createCondition methods allow the client to create conditions for specific
column families in a
+ * tablets metadata. The conditions will check for equality based on the value
in the column
+ * qualifier or values in the column qualifier and Value.
+ *
+ * <h2>Options</h2>
+ * <ul>
+ * <li><b>concat.value:</b> This option must be supplied. If true, then the
bytes from the Value
+ * will be concatenated with a null byte separator.</li>
+ * </ul>
*/
-public class SetEqualityIterator implements SortedKeyValueIterator<Key,Value> {
+public class SetEncodingIterator implements SortedKeyValueIterator<Key,Value> {
- // ELASTICITY_TODO unit test this iterator
+ public static final String CONCAT_VALUE = "concat.value";
+ private static final String VALUE_SEPARATOR = "\u0000";
+ private static final byte[] VALUE_SEPARATOR_BYTES =
VALUE_SEPARATOR.getBytes(UTF_8);
+ private static final int VALUE_SEPARATOR_BYTES_LENGTH =
VALUE_SEPARATOR_BYTES.length;
private SortedKeyValueIterator<Key,Value> source;
private Key startKey = null;
private Value topValue = null;
+ private boolean concat = false;
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
@@ -79,14 +94,21 @@ public class SetEqualityIterator implements
SortedKeyValueIterator<Key,Value> {
int count = 0;
while (source.hasTop()) {
+ final byte[] bytesToWrite;
byte[] ba = source.getTopKey().getColumnQualifierData().toArray();
- dos.writeInt(ba.length);
- dos.write(ba, 0, ba.length);
+ if (concat) {
+ byte[] val = source.getTopValue().get();
+ bytesToWrite = encodeKeyValue(ba, val);
+ } else {
+ bytesToWrite = ba;
+ }
+ dos.writeInt(bytesToWrite.length);
+ dos.write(bytesToWrite, 0, bytesToWrite.length);
source.next();
count++;
}
- // The lenght is written last so that buffering can be avoided in this
iterator.
+ // The length is written last so that buffering can be avoided in this
iterator.
dos.writeInt(count);
topValue = new Value(baos.toByteArray());
@@ -138,7 +160,13 @@ public class SetEqualityIterator implements
SortedKeyValueIterator<Key,Value> {
@Override
public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
IteratorEnvironment env) throws IOException {
+ String concat = options.get(CONCAT_VALUE);
+ if (concat == null || !(concat.equalsIgnoreCase("true") ||
concat.equalsIgnoreCase("false"))) {
+ throw new IllegalArgumentException(
+ CONCAT_VALUE + " option must be supplied with a value of 'true' or
'false'");
+ }
this.source = source;
+ this.concat = Boolean.parseBoolean(concat);
}
@Override
@@ -176,14 +204,45 @@ public class SetEqualityIterator implements
SortedKeyValueIterator<Key,Value> {
}
}
+ private static byte[] encodeKeyValue(byte[] key, byte[] val) {
+ var bytesToWrite = new byte[key.length + VALUE_SEPARATOR_BYTES_LENGTH +
val.length];
+ System.arraycopy(key, 0, bytesToWrite, 0, key.length);
+ System.arraycopy(VALUE_SEPARATOR_BYTES, 0, bytesToWrite, key.length,
+ VALUE_SEPARATOR_BYTES_LENGTH);
+ System.arraycopy(val, 0, bytesToWrite, key.length +
VALUE_SEPARATOR_BYTES_LENGTH, val.length);
+ return bytesToWrite;
+ }
+
private static final Text EMPTY = new Text();
+ /*
+ * Create a condition that will check the column qualifier values of the
rows in the tablets
+ * metadata with the matching family against a set of values produced by the
encoder function.
+ */
public static <T> Condition createCondition(Collection<T> set,
Function<T,byte[]> encoder,
Text family) {
Preconditions.checkArgument(set instanceof Set);
IteratorSetting is = new
IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO,
- SetEqualityIterator.class);
+ SetEncodingIterator.class);
+ is.addOption(SetEncodingIterator.CONCAT_VALUE, Boolean.toString(false));
return new Condition(family, EMPTY).setValue(encode((Set<T>) set,
encoder)).setIterators(is);
}
+ /*
+ * Create a condition that will check the column qualifier and Value values
of the rows in the
+ * tablets metadata with the matching family against a set of values
produced by the encoder
+ * function.
+ */
+ public static <T> Condition createConditionWithVal(Collection<T> set,
+ Function<T,Pair<byte[],byte[]>> encoder, Text family) {
+ Preconditions.checkArgument(set instanceof Set);
+ IteratorSetting is = new
IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO,
+ SetEncodingIterator.class);
+ is.addOption(SetEncodingIterator.CONCAT_VALUE, Boolean.toString(true));
+ return new Condition(family, EMPTY).setValue(encode((Set<T>) set, s -> {
+ Pair<byte[],byte[]> kv = encoder.apply(s);
+ return encodeKeyValue(kv.getFirst(), kv.getSecond());
+ })).setIterators(is);
+ }
+
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/SetEqualityIteratorTest.java
b/server/base/src/test/java/org/apache/accumulo/server/SetEncodingIteratorTest.java
similarity index 72%
rename from
server/base/src/test/java/org/apache/accumulo/server/SetEqualityIteratorTest.java
rename to
server/base/src/test/java/org/apache/accumulo/server/SetEncodingIteratorTest.java
index 3d6c11f9b8..90195493cd 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/SetEqualityIteratorTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/SetEncodingIteratorTest.java
@@ -21,10 +21,11 @@ package org.apache.accumulo.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import java.util.Collections;
-import java.util.Set;
+import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -39,17 +40,20 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.server.metadata.iterators.SetEqualityIterator;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class SetEqualityIteratorTest {
+public class SetEncodingIteratorTest {
- private SetEqualityIterator setEqualityIterator;
- private SetEqualityIterator setEqualityIteratorNoFiles;
- private SetEqualityIterator setEqualityIteratorOneFile;
+ private TabletMetadata tmOneFile;
+ private TabletMetadata tmMultipleFiles;
+ private SetEncodingIterator setEqualityIterator;
+ private SetEncodingIterator setEqualityIteratorNoFiles;
+ private SetEncodingIterator setEqualityIteratorOneFile;
private SortedMapIterator sortedMapIterator;
private SortedMapIterator sortedMapIteratorNoFiles;
private SortedMapIterator sortedMapIteratorOneFile;
@@ -72,13 +76,13 @@ public class SetEqualityIteratorTest {
// Create tablet metadata with one file
StoredTabletFile singleFile =
new ReferencedTabletFile(new
Path("dfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert();
- TabletMetadata tmOneFile = TabletMetadata.builder(extent)
- .putFile(singleFile, new DataFileValue(100, 50)).putFlushId(8).build();
+ tmOneFile = TabletMetadata.builder(extent).putFile(singleFile, new
DataFileValue(100, 50))
+ .putFlushId(8).build();
// Create tablet metadata with multiple files
- TabletMetadata tmMultipleFiles = TabletMetadata.builder(extent)
- .putFile(file1, new DataFileValue(0, 0)).putFile(file2, new
DataFileValue(555, 23))
- .putFile(file3, new DataFileValue(234, 13)).putFlushId(6).build();
+ tmMultipleFiles = TabletMetadata.builder(extent).putFile(file1, new
DataFileValue(0, 0))
+ .putFile(file2, new DataFileValue(555, 23)).putFile(file3, new
DataFileValue(234, 13))
+ .putFlushId(6).build();
var extent2 = new KeyExtent(extent.tableId(), null, extent.endRow());
// create another tablet metadata using extent2 w/ diff files and add it
to sortedMap. This
@@ -107,12 +111,15 @@ public class SetEqualityIteratorTest {
sortedMapIteratorOneFile = new SortedMapIterator(sortedMapOneFile);
// Set the SortedMapIterator as the source for SetEqualityIterator
- setEqualityIterator = new SetEqualityIterator();
- setEqualityIterator.init(sortedMapIterator, Collections.emptyMap(), null);
- setEqualityIteratorNoFiles = new SetEqualityIterator();
- setEqualityIteratorNoFiles.init(sortedMapIteratorNoFiles,
Collections.emptyMap(), null);
- setEqualityIteratorOneFile = new SetEqualityIterator();
- setEqualityIteratorOneFile.init(sortedMapIteratorOneFile,
Collections.emptyMap(), null);
+ setEqualityIterator = new SetEncodingIterator();
+ setEqualityIterator.init(sortedMapIterator,
Map.of(SetEncodingIterator.CONCAT_VALUE, "true"),
+ null);
+ setEqualityIteratorNoFiles = new SetEncodingIterator();
+ setEqualityIteratorNoFiles.init(sortedMapIteratorNoFiles,
+ Map.of(SetEncodingIterator.CONCAT_VALUE, "false"), null);
+ setEqualityIteratorOneFile = new SetEncodingIterator();
+ setEqualityIteratorOneFile.init(sortedMapIteratorOneFile,
+ Map.of(SetEncodingIterator.CONCAT_VALUE, "true"), null);
}
@Test
@@ -129,7 +136,7 @@ public class SetEqualityIteratorTest {
// Asserting the result
assertEquals(new Key(tabletRow, family),
setEqualityIteratorNoFiles.getTopKey());
// The iterator should produce a value that is equal to the expected value
on the condition
- var condition = SetEqualityIterator.createCondition(Collections.emptySet(),
+ var condition = SetEncodingIterator.createCondition(Collections.emptySet(),
storedTabletFile -> ((StoredTabletFile)
storedTabletFile).getMetadata().getBytes(UTF_8),
family);
assertArrayEquals(condition.getValue().toArray(),
@@ -150,8 +157,10 @@ public class SetEqualityIteratorTest {
// Asserting the result
assertEquals(new Key(tabletRow, family),
setEqualityIteratorOneFile.getTopKey());
// The iterator should produce a value that is equal to the expected value
on the condition
- var condition =
SetEqualityIterator.createCondition(Collections.singleton(file1),
- storedTabletFile -> storedTabletFile.getMetadata().getBytes(UTF_8),
family);
+ var condition =
SetEncodingIterator.createConditionWithVal(tmOneFile.getFilesMap().entrySet(),
+ entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8),
+ entry.getValue().encode()),
+ family);
assertArrayEquals(condition.getValue().toArray(),
setEqualityIteratorOneFile.getTopValue().get());
}
@@ -170,10 +179,26 @@ public class SetEqualityIteratorTest {
// Asserting the result
assertEquals(new Key(tabletRow, family), setEqualityIterator.getTopKey());
// The iterator should produce a value that is equal to the expected value
on the condition
- var condition = SetEqualityIterator.createCondition(Set.of(file1, file2,
file3),
- storedTabletFile -> storedTabletFile.getMetadata().getBytes(UTF_8),
family);
+ var condition =
+
SetEncodingIterator.createConditionWithVal(tmMultipleFiles.getFilesMap().entrySet(),
+ entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8),
+ entry.getValue().encode()),
+ family);
assertArrayEquals(condition.getValue().toArray(),
setEqualityIterator.getTopValue().get());
}
+ @Test
+ public void testInvalidConcatValueOption() throws IOException {
+ SetEncodingIterator iter = new SetEncodingIterator();
+ iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, "true"), null);
+ iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, "false"), null);
+ assertThrows(IllegalArgumentException.class, () -> iter.init(null,
Map.of(), null));
+ assertThrows(IllegalArgumentException.class,
+ () -> iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, "yes"),
null));
+ assertThrows(IllegalArgumentException.class,
+ () -> iter.init(null, Map.of(SetEncodingIterator.CONCAT_VALUE, ""),
null));
+
+ }
+
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 4a05fad955..a077edf7be 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -319,6 +319,19 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
.putFile(stf4, new DataFileValue(0,
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
.submit(tm -> false);
results = ctmi.process();
+ // First attempt should fail because the dfvs were replaced in the test
+ // so the values of the files will not match
+ assertEquals(Status.REJECTED, results.get(e1).getStatus());
+
+ // Try again with the correct comapcted datafiles
+ var compactedDv = new DataFileValue(0, 0);
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ tm5 = TabletMetadata.builder(e1).putFile(stf1,
compactedDv).putFile(stf2, compactedDv)
+ .putFile(stf3, compactedDv).build();
+ ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES)
+ .putFile(stf4, new DataFileValue(0,
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
+ .submit(tm -> false);
+ results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles());
@@ -332,7 +345,7 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
FateId fateId = FateId.from(type, UUID.randomUUID());
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
.putFile(stf5, new DataFileValue(0,
0)).putBulkFile(stf5.getTabletFile(), fateId)
- .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false);
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -342,7 +355,8 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
var stf6 = StoredTabletFile
.of(new
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf"));
ctmi = new ConditionalTabletsMutatorImpl(context);
- var tm7 = TabletMetadata.builder(e1).putFile(stf4, dfv).putFile(stf5,
dfv).build();
+ var tm7 =
+ TabletMetadata.builder(e1).putFile(stf4, compactedDv).putFile(stf5,
compactedDv).build();
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm7, FILES)
.putFile(stf6, new DataFileValue(0,
0)).deleteFile(stf4).deleteFile(stf5)
.submit(tm -> false);