This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git

commit 2be79180142b8b02647f02a838b52bb9ef4d5081
Author: Keith Turner <[email protected]>
AuthorDate: Sun Nov 17 21:46:08 2024 +0000

    limits tablets and offers bulk import as option for ingest
    
    Two new continuous ingest features are introduced in this changes. First
    options were added to limit the number of tablets written.  Second an
    options to use bulk ingest was added instead of a batch writer.
    
    These features support running a test like the following.
    
     * create a continuous ingest table with 1000 tablets
     * start 100 continuous ingest clients
     * have each client continually bulk import data to 10 random tablets
    
    This test situation will create a lot of bulk import and subsequent
    compaction activity for Accumulo to handle.
    
    These changes add bulk import to the `cingest ingest` command.  There is
    an existing `cingest bulk` command that runs a map reduce job to create
    bulk files.  These changes do not remove the need for the existing map
    reduce job, they fill a different purpose.  The map reduce job can
    generate really large amount of data to bulk import.  These changes
    allow generating lots of bulk imports w/ small amounts of data. These
    changes could never generate the amount of data for a single bulk import
    that the map reduce job could. The following is an example of test
    scenario that could use both.
    
     * create a continuous ingest table with 1000 tablets
     * use map reduce bulk job to create an initial 10 billion entries in
       the table
     * start 100 continuous ingest clients
     * have each client continually bulk import data to 10 random tablets
     * stop clients after 12 hours and verify data
    
    cherry-pick of 2e19341d9611c6787d5a706143556b167f4bf721
---
 conf/accumulo-testing.properties                   |  11 +-
 .../org/apache/accumulo/testing/TestProps.java     |   8 +
 .../testing/continuous/BulkBatchWriter.java        | 183 ++++++++++++++++++++
 .../testing/continuous/ContinuousIngest.java       | 185 ++++++++++++++++++---
 .../accumulo/testing/continuous/ManySplits.java    |   7 +-
 5 files changed, 371 insertions(+), 23 deletions(-)

diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 49aa611..7e4bc91 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -50,7 +50,7 @@ tserver.compaction.major.service.cs1.planner.opts.executors=\
 [{"name":"small","type":"internal","maxSize":"16M","numThreads":8},\
 {"name":"medium","type":"internal","maxSize":"128M","numThreads":4},\
 {"name":"large","type":"internal","numThreads":2}]
-  
+
 # Accumulo table properties to set when creating table
 test.ci.common.accumulo.table.props=\
 
table.compaction.dispatcher=org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher
 \
@@ -70,6 +70,10 @@ test.ci.ingest.row.max=9223372036854775807
 test.ci.ingest.max.cf=32767
 # Maximum number of random column qualifiers to generate
 test.ci.ingest.max.cq=32767
+# Maximum number of tablets that will be written to for a single flush.  For 
each iteration of flush the tablets to
+# write to are randomly chosen.  When this is set to Integer.MAX_VALUE no 
limiting is done.  This must be set to
+# a number in the range [2,Integer.MAX_VALUE].
+test.ci.ingest.max.tablets=2147483647
 # Optional visibilities (in CSV format) that if specified will be randomly 
selected by ingesters for
 # each linked list
 test.ci.ingest.visibilities=
@@ -97,6 +101,11 @@ test.ci.ingest.zipfian.min.size=0
 test.ci.ingest.zipfian.max.size=10000
 # Exponent of the Zipfian distribution
 test.ci.ingest.zipfian.exponent=1.5
+# If set to a path in hdfs will use bulk import instead of batch writer to 
ingest data
+test.ci.ingest.bulk.workdir=
+# When using bulk import to ingest data this determines how much memory can be 
used to buffer mutations before creating
+# rfiles and importing them.
+test.ci.ingest.bulk.memory.limit=512000000
 
 # Batch walker
 # ------------
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java 
b/src/main/java/org/apache/accumulo/testing/TestProps.java
index aa8e9e6..1e85181 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -106,6 +106,14 @@ public class TestProps {
   // The probability (between 0.0 and 1.0) that a set of entries will be 
deleted during continuous
   // ingest
   public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + 
"delete.probability";
+  // The max number of tablets that will be written to between flushes of the 
batch writer. Randomly
+  // selects the tablets when starting a new flush iteration.
+  public static final String CI_INGEST_MAX_TABLETS = CI_INGEST + "max.tablets";
+  // If set to a path in hdfs will use bulk import instead of batch writer to 
ingest data
+  public static final String CI_INGEST_BULK_WORK_DIR = CI_INGEST + 
"bulk.workdir";
+  // When using bulk import to ingest data this determines how much memory can 
be used to buffer
+  // mutations before creating rfiles and importing them.
+  public static final String CI_INGEST_BULK_MEM_LIMIT = CI_INGEST + 
"bulk.memory.limit";
 
   /** Batch Walker **/
   // Sleep time between batch scans (in ms)
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java 
b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
new file mode 100644
index 0000000..fe50575
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.testing.continuous;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+public class BulkBatchWriter implements BatchWriter {
+
+  private static final Logger log = 
LoggerFactory.getLogger(BulkBatchWriter.class);
+
+  private final List<Mutation> mutations = new ArrayList<>();
+  private final AccumuloClient client;
+  private final String tableName;
+  private final FileSystem fileSystem;
+  private final Path workPath;
+  private final long memLimit;
+  private final Supplier<SortedSet<Text>> splitSupplier;
+
+  private long memUsed;
+  private boolean closed = false;
+
+  public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem 
fileSystem,
+      Path workPath, long memLimit) {
+    this.client = client;
+    this.tableName = tableName;
+    this.fileSystem = fileSystem;
+    this.workPath = workPath;
+    this.memLimit = memLimit;
+    this.splitSupplier = Suppliers.memoizeWithExpiration(() -> {
+      try {
+        var splits = client.tableOperations().listSplits(tableName);
+        return new TreeSet<>(splits);
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+
+    }, 10, TimeUnit.MINUTES);
+  }
+
+  @Override
+  public synchronized void addMutation(Mutation mutation) throws 
MutationsRejectedException {
+    Preconditions.checkState(!closed);
+    mutation = new Mutation(mutation);
+    mutations.add(mutation);
+    memUsed += mutation.estimatedMemoryUsed();
+    if (memUsed > memLimit) {
+      flush();
+    }
+  }
+
+  @Override
+  public synchronized void addMutations(Iterable<Mutation> iterable)
+      throws MutationsRejectedException {
+    for (var mutation : iterable) {
+      addMutation(mutation);
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws MutationsRejectedException {
+    Preconditions.checkState(!closed);
+
+    try {
+      var splits = splitSupplier.get();
+
+      Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
+      fileSystem.mkdirs(tmpDir);
+      mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow()));
+
+      RFileWriter writer = null;
+      byte[] currEndRow = null;
+      int nextFileNameCounter = 0;
+
+      var loadPlanBuilder = LoadPlan.builder();
+
+      for (var mutation : mutations) {
+        if (writer == null
+            || (currEndRow != null && Arrays.compare(mutation.getRow(), 
currEndRow) > 0)) {
+          if (writer != null) {
+            writer.close();
+          }
+
+          var row = new Text(mutation.getRow());
+          var headSet = splits.headSet(row);
+          var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
+          var tailSet = splits.tailSet(row);
+          var tabletEndRow = tailSet.isEmpty() ? null : tailSet.first();
+          currEndRow = tabletEndRow == null ? null : tabletEndRow.copyBytes();
+
+          String filename = String.format("bbw-%05d.rf", 
nextFileNameCounter++);
+          writer = RFile.newWriter().to(tmpDir + "/" + 
filename).withFileSystem(fileSystem).build();
+          loadPlanBuilder = loadPlanBuilder.loadFileTo(filename, 
LoadPlan.RangeType.TABLE,
+              tabletPrevRow, tabletEndRow);
+
+          log.debug("Created new file {} for range {} {}", filename, 
tabletPrevRow, tabletEndRow);
+        }
+
+        for (var colUpdate : mutation.getUpdates()) {
+          var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(),
+              colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility());
+          if (colUpdate.hasTimestamp()) {
+            key.setTimestamp(colUpdate.getTimestamp());
+          }
+          if (colUpdate.isDeleted()) {
+            key.setDeleted(true);
+          }
+          writer.append(key, colUpdate.getValue());
+        }
+      }
+
+      if (writer != null) {
+        writer.close();
+      }
+
+      // TODO make table time configurable?
+      var loadPlan = loadPlanBuilder.build();
+
+      long t1 = System.nanoTime();
+      
client.tableOperations().importDirectory(tmpDir.toString()).to(tableName).plan(loadPlan)
+          .tableTime(true).load();
+      long t2 = System.nanoTime();
+
+      log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} 
time:{}ms", tmpDir,
+          loadPlan.getDestinations().size(), mutations.size(), memUsed,
+          TimeUnit.NANOSECONDS.toMillis(t2 - t1));
+
+      fileSystem.delete(tmpDir, true);
+
+      mutations.clear();
+      memUsed = 0;
+    } catch (Exception e) {
+      closed = true;
+      throw new MutationsRejectedException(client, List.of(), Map.of(), 
List.of(), 1, e);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws MutationsRejectedException {
+    flush();
+    closed = true;
+  }
+}
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index bb93ca7..d775785 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -20,13 +20,18 @@ package org.apache.accumulo.testing.continuous;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -39,10 +44,15 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.accumulo.testing.util.FastFormat;
 import org.apache.commons.math3.random.RandomDataGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 
 public class ContinuousIngest {
 
@@ -64,6 +74,137 @@ public class ContinuousIngest {
 
   private static RandomDataGenerator rnd;
 
+  public interface RandomGeneratorFactory extends Supplier<LongSupplier> {
+    static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient 
client, Random random) {
+      final long rowMin = env.getRowMin();
+      final long rowMax = env.getRowMax();
+      Properties testProps = env.getTestProperties();
+      final int maxTablets =
+          
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_TABLETS));
+
+      if (maxTablets == Integer.MAX_VALUE) {
+        return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random);
+      } else {
+        var tableName = env.getAccumuloTableName();
+        Supplier<SortedSet<Text>> splitSupplier = 
Suppliers.memoizeWithExpiration(() -> {
+          try {
+            var splits = client.tableOperations().listSplits(tableName);
+            return new TreeSet<>(splits);
+          } catch (Exception e) {
+            throw new IllegalStateException(e);
+          }
+
+        }, 10, TimeUnit.MINUTES);
+        return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, 
maxTablets, splitSupplier,
+            random);
+      }
+    }
+  }
+
+  public static class MinMaxRandomGeneratorFactory implements 
RandomGeneratorFactory {
+    private final LongSupplier generator;
+
+    public MinMaxRandomGeneratorFactory(long rowMin, long rowMax, Random 
random) {
+      Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
+          "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
+      generator = () -> ContinuousIngest.genLong(rowMin, rowMax, random);
+    }
+
+    @Override
+    public LongSupplier get() {
+      return generator;
+    }
+  }
+
+  /**
+   * Chooses X random tablets and only generates random rows that fall within 
those tablets.
+   */
+  public static class MaxTabletsRandomGeneratorFactory implements 
RandomGeneratorFactory {
+    private final int maxTablets;
+    private final Supplier<SortedSet<Text>> splitSupplier;
+    private final Random random;
+    private final long minRow;
+    private final long maxRow;
+
+    public MaxTabletsRandomGeneratorFactory(long minRow, long maxRow, int 
maxTablets,
+        Supplier<SortedSet<Text>> splitSupplier, Random random) {
+      // writing to a single tablet does not make much sense because this test 
it predicated on
+      // having rows in tablets point to rows in other tablet to detect errors
+      Preconditions.checkState(maxTablets > 1, "max tablets config must be > 
1");
+      this.maxTablets = maxTablets;
+      this.splitSupplier = splitSupplier;
+      this.random = random;
+      this.minRow = minRow;
+      this.maxRow = maxRow;
+    }
+
+    @Override
+    public LongSupplier get() {
+      var splits = splitSupplier.get();
+      if (splits.size() < maxTablets) {
+        // There are less tablets so generate within the tablet range
+        return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get();
+      } else {
+        long prev = minRow;
+        List<LongSupplier> allGenerators = new ArrayList<>(splits.size() + 1);
+        for (var split : splits) {
+          // splits are derived from inspecting rfile indexes and rfile 
indexes can shorten rows
+          // introducing non-hex chars so need to handle non-hex chars in the 
splits
+          // TODO this handling may not be correct, but it will not introduce 
errors but may cause
+          // writing a small amount of data to an extra tablet.
+          byte[] bytes = split.copyBytes();
+          int len = bytes.length;
+          int last = bytes.length - 1;
+          if (bytes[last] < '0') {
+            len = last;
+          } else if (bytes[last] > '9' && bytes[last] < 'a') {
+            bytes[last] = '9';
+          } else if (bytes[last] > 'f') {
+            bytes[last] = 'f';
+          }
+
+          var splitStr = new String(bytes, 0, len, UTF_8);
+          var splitNum = Long.parseLong(splitStr, 16) << (64 - 
splitStr.length() * 4);
+          allGenerators.add(new MinMaxRandomGeneratorFactory(prev, splitNum, 
random).get());
+          prev = splitNum;
+        }
+        allGenerators.add(new MinMaxRandomGeneratorFactory(prev, maxRow, 
random).get());
+
+        Collections.shuffle(allGenerators, random);
+        var generators = List.copyOf(allGenerators.subList(0, maxTablets));
+
+        return () -> {
+          // pick a generator for random tablet
+          var generator = generators.get(random.nextInt(generators.size()));
+          // pick a random long that falls within that tablet
+          return generator.getAsLong();
+        };
+      }
+    }
+  }
+
+  public interface BatchWriterFactory {
+    BatchWriter create(String tableName) throws TableNotFoundException;
+
+    static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) 
{
+      Properties testProps = env.getTestProperties();
+      final String bulkWorkDir = 
testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR);
+      if (bulkWorkDir == null || bulkWorkDir.isBlank()) {
+        return client::createBatchWriter;
+      } else {
+        try {
+          var conf = new Configuration();
+          var workDir = new Path(bulkWorkDir);
+          var filesystem = workDir.getFileSystem(conf);
+          var memLimit = 
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
+          return tableName -> new BulkBatchWriter(client, tableName, 
filesystem, workDir, memLimit);
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    }
+  }
+
   private static ColumnVisibility getVisibility(Random rand) {
     return visibilities.get(rand.nextInt(visibilities.size()));
   }
@@ -112,8 +253,6 @@ public class ContinuousIngest {
 
       AccumuloClient client = env.getAccumuloClient();
 
-      final long rowMin = env.getRowMin();
-      final long rowMax = env.getRowMax();
       String tableName = env.getAccumuloTableName();
       Properties testProps = env.getTestProperties();
       final int maxColF = env.getMaxColF();
@@ -124,17 +263,17 @@ public class ContinuousIngest {
       final boolean checksum =
           
Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
 
-      doIngest(client, rowMin, rowMax, tableName, testProps, maxColF, maxColQ, 
numEntries, checksum,
-          random);
+      var randomFactory = RandomGeneratorFactory.create(env, client, random);
+      var batchWriterFactory = BatchWriterFactory.create(client, env);
+      doIngest(client, randomFactory, batchWriterFactory, tableName, 
testProps, maxColF, maxColQ,
+          numEntries, checksum, random);
     }
   }
 
-  protected static void doIngest(AccumuloClient client, long rowMin, long 
rowMax, String tableName,
-      Properties testProps, int maxColF, int maxColQ, long numEntries, boolean 
checksum,
-      Random random)
+  protected static void doIngest(AccumuloClient client, RandomGeneratorFactory 
randomFactory,
+      BatchWriterFactory batchWriterFactory, String tableName, Properties 
testProps, int maxColF,
+      int maxColQ, long numEntries, boolean checksum, Random random)
       throws TableNotFoundException, MutationsRejectedException, 
InterruptedException {
-    Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
-        "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
 
     if (!client.tableOperations().exists(tableName)) {
       throw new TableNotFoundException(null, tableName,
@@ -181,26 +320,29 @@ public class ContinuousIngest {
     log.info("DELETES will occur with a probability of {}",
         String.format("%.02f", deleteProbability));
 
-    zipfianEnabled = 
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
+    try (BatchWriter bw = batchWriterFactory.create(tableName)) {
+      zipfianEnabled =
+          
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
 
-    if (zipfianEnabled) {
-      minSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
-      maxSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
-      exponent = 
Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
-      rnd = new RandomDataGenerator();
+      if (zipfianEnabled) {
+        minSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
+        maxSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
+        exponent = 
Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
+        rnd = new RandomDataGenerator();
 
-      log.info("Zipfian distribution enabled with min size: {}, max size: {}, 
exponent: {}",
-          minSize, maxSize, exponent);
-    }
+        log.info("Zipfian distribution enabled with min size: {}, max size: 
{}, exponent: {}",
+            minSize, maxSize, exponent);
+      }
 
-    try (BatchWriter bw = client.createBatchWriter(tableName)) {
       out: while (true) {
         ColumnVisibility cv = getVisibility(random);
 
         // generate sets nodes that link to previous set of nodes
         for (int depth = 0; depth < maxDepth; depth++) {
+          // use the same random generator for each flush interval
+          LongSupplier randomRowGenerator = randomFactory.get();
           for (int index = 0; index < flushInterval; index++) {
-            long rowLong = genLong(rowMin, rowMax, random);
+            long rowLong = randomRowGenerator.getAsLong();
 
             byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 
1][index].row);
 
@@ -323,6 +465,9 @@ public class ContinuousIngest {
     return FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES);
   }
 
+  /**
+   * Generates a random long within the range [min,max)
+   */
   public static long genLong(long min, long max, Random r) {
     return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min;
   }
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java
index 6a96574..b590e1a 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.testing.TestProps;
+import 
org.apache.accumulo.testing.continuous.ContinuousIngest.RandomGeneratorFactory;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,8 +109,10 @@ public class ManySplits {
           Map.of());
 
       log.info("Ingesting {} entries into first table, {}.", initialData, 
firstTable);
-      ContinuousIngest.doIngest(client, rowMin, rowMax, firstTable, testProps, 
maxColF, maxColQ,
-          initialData, false, random);
+      var randomFactory = RandomGeneratorFactory.create(env, client, random);
+      var batchWriterFactory = 
ContinuousIngest.BatchWriterFactory.create(client, env);
+      ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, 
firstTable, testProps,
+          maxColF, maxColQ, initialData, false, random);
 
       client.tableOperations().flush(firstTable);
 

Reply via email to