This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/2.1 by this push:
new e9e5e7a Add zipfian distribution option to continuous ingest (#276)
e9e5e7a is described below
commit e9e5e7a8ead4ccf74f074da2974d4720019bbb72
Author: Dom G. <[email protected]>
AuthorDate: Tue Dec 10 15:38:23 2024 -0500
Add zipfian distribution option to continuous ingest (#276)
* adds an option to vary the size of entries that are ingested via
continuous ingest. The size of the entries follow a zipfian distribution.
---
conf/accumulo-testing.properties | 8 ++++
.../testing/continuous/ContinuousIngest.java | 51 +++++++++++++++++++++-
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 36f618c..49aa611 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -89,6 +89,14 @@ test.ci.ingest.pause.duration.max=120
# The probability (between 0.0 and 1.0) that a set of entries will be deleted
during continuous ingest
# To disable deletes, set probability to 0.0
test.ci.ingest.delete.probability=0.1
+# Enables Zipfian distribution for value size. If set to true, the value will
have random bytes inserted into it with a size generated based on a Zipfian
distribution.
+test.ci.ingest.zipfian.enabled=true
+# Minimum size to insert into the value when Zipfian distribution is enabled
+test.ci.ingest.zipfian.min.size=0
+# Maximum size to insert into the value when Zipfian distribution is enabled
+test.ci.ingest.zipfian.max.size=10000
+# Exponent of the Zipfian distribution
+test.ci.ingest.zipfian.exponent=1.5
# Batch walker
# ------------
diff --git
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 1bb32a5..bb93ca7 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.TestProps;
import org.apache.accumulo.testing.util.FastFormat;
+import org.apache.commons.math3.random.RandomDataGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,13 @@ public class ContinuousIngest {
private static int pauseMin;
private static int pauseMax;
+ private static boolean zipfianEnabled;
+ private static int minSize;
+ private static int maxSize;
+ private static double exponent;
+
+ private static RandomDataGenerator rnd;
+
private static ColumnVisibility getVisibility(Random rand) {
return visibilities.get(rand.nextInt(visibilities.size()));
}
@@ -173,6 +181,18 @@ public class ContinuousIngest {
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));
+ zipfianEnabled =
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
+
+ if (zipfianEnabled) {
+ minSize =
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
+ maxSize =
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
+ exponent =
Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
+ rnd = new RandomDataGenerator();
+
+ log.info("Zipfian distribution enabled with min size: {}, max size: {},
exponent: {}",
+ minSize, maxSize, exponent);
+ }
+
try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(random);
@@ -317,18 +337,37 @@ public class ContinuousIngest {
public static byte[] createValue(byte[] ingestInstanceId, long
entriesWritten, byte[] prevRow,
Checksum cksum) {
- int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 :
prevRow.length) + 3;
+ final int numOfSeparators = 4;
+ int dataLen =
+ ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length)
+ numOfSeparators;
if (cksum != null)
dataLen += 8;
+
+ int zipfLength = 0;
+ if (zipfianEnabled) {
+ // add the length of the zipfian data to the value
+ int range = maxSize - minSize;
+ zipfLength = rnd.nextZipf(range, exponent) + minSize;
+ dataLen += zipfLength;
+ }
+
byte[] val = new byte[dataLen];
+
+ // add the ingest instance id to the value
System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
int index = ingestInstanceId.length;
+
val[index++] = ':';
+
+ // add the count of entries written to the value
int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16,
16, EMPTY_BYTES);
if (added != 16)
throw new RuntimeException(" " + added);
index += 16;
+
val[index++] = ':';
+
+ // add the previous row to the value
if (prevRow != null) {
System.arraycopy(prevRow, 0, val, index, prevRow.length);
index += prevRow.length;
@@ -336,6 +375,16 @@ public class ContinuousIngest {
val[index++] = ':';
+ if (zipfianEnabled) {
+ // add random data to the value of length zipfLength
+ for (int i = 0; i < zipfLength; i++) {
+ val[index++] = (byte) rnd.nextInt(0, 256);
+ }
+
+ val[index++] = ':';
+ }
+
+ // add the checksum to the value
if (cksum != null) {
cksum.update(val, 0, index);
cksum.getValue();