This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f4b1b9a087 compress data used by tablet management iterator (#5736)
f4b1b9a087 is described below

commit f4b1b9a0872867ab82d2b4660ed3c8c1245e3ede
Author: Dom G. <[email protected]>
AuthorDate: Tue Jul 15 11:38:07 2025 -0400

    compress data used by tablet management iterator (#5736)
    
    * compress data used by tablet management iterator
---
 core/pom.xml                                       |   6 +
 .../org/apache/accumulo/core/conf/Property.java    |   3 +
 .../apache/accumulo/core/conf/PropertyType.java    |  32 ++++
 .../accumulo/core/conf/PropertyTypeTest.java       |  33 +++-
 pom.xml                                            |   1 +
 .../server/iterators/ServerIteratorOptions.java    | 142 ++++++++++++++++
 .../manager/state/TabletManagementIterator.java    |  11 +-
 .../manager/state/TabletManagementScanner.java     |   2 +-
 .../iterators/ServerIteratorOptionsTest.java       | 185 +++++++++++++++++++++
 .../functional/TabletManagementIteratorIT.java     |  36 +++-
 10 files changed, 432 insertions(+), 19 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 028db73ef3..696d4ff8d2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -193,6 +193,12 @@
       <artifactId>junit-jupiter-params</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
+      <version>1.7.1</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <testResources>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 8139e31ff3..0f47eee39e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -377,6 +377,9 @@ public enum Property {
   GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0", 
PropertyType.STRING,
       "The local IP address to which this server should bind for sending and 
receiving network traffic.",
       "3.0.0"),
+  
GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO("general.server.iter.opts.compression",
 "none",
+      PropertyType.COMPRESSION_TYPE,
+      "Compression algorithm name to use for server-side iterator options 
compression.", "2.1.4"),
   
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval",
 "2m",
       PropertyType.TIMEDURATION,
       "Interval at which the Manager and TabletServer should verify their 
server locks. A value of zero"
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java 
b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index 576e57ebb4..b874f3f69e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -34,8 +34,11 @@ import java.util.stream.Stream;
 
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
 import org.apache.commons.lang3.Range;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.Compressor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,6 +135,9 @@ public enum PropertyType {
       "A list of fully qualified java class names representing classes on the 
classpath.\n"
           + "An example is 'java.lang.String', rather than 'String'"),
 
+  COMPRESSION_TYPE("compression type name", new ValidCompressionType(),
+      "One of the configured compression types."),
+
   DURABILITY("durability", in(false, null, "default", "none", "log", "flush", 
"sync"),
       "One of 'none', 'log', 'flush' or 'sync'."),
 
@@ -285,6 +291,32 @@ public enum PropertyType {
         || (suffixCheck.test(x) && new Bounds(lowerBound, 
upperBound).test(stripUnits.apply(x)));
   }
 
+  private static class ValidCompressionType implements Predicate<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValidCompressionType.class);
+
+    @Override
+    public boolean test(String type) {
+      if (!Compression.getSupportedAlgorithms().contains(type)) {
+        return false;
+      }
+      CompressionAlgorithm ca = 
Compression.getCompressionAlgorithmByName(type);
+      Compressor c = null;
+      try {
+        c = ca.getCompressor();
+        return true;
+      } catch (RuntimeException e) {
+        LOG.error("Error creating compressor for type {}", type, e);
+        return false;
+      } finally {
+        if (c != null) {
+          ca.returnCompressor(c);
+        }
+      }
+    }
+
+  }
+
   private static final Pattern SUFFIX_REGEX = Pattern.compile("\\D*$"); // 
match non-digits at end
   private static final Function<String,String> stripUnits =
       x -> x == null ? null : SUFFIX_REGEX.matcher(x.trim()).replaceAll("");
diff --git 
a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java 
b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
index 708ead87c1..ab52ccc309 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.core.conf;
 
+import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -84,17 +85,21 @@ public class PropertyTypeTest extends WithTestNames {
   }
 
   private void valid(final String... args) {
-    for (String s : args) {
-      assertTrue(type.isValidFormat(s),
-          s + " should be valid for " + PropertyType.class.getSimpleName() + 
"." + type.name());
-    }
+    assertAll(() -> {
+      for (String s : args) {
+        assertTrue(type.isValidFormat(s),
+            s + " should be valid for " + PropertyType.class.getSimpleName() + 
"." + type.name());
+      }
+    });
   }
 
   private void invalid(final String... args) {
-    for (String s : args) {
-      assertFalse(type.isValidFormat(s),
-          s + " should be invalid for " + PropertyType.class.getSimpleName() + 
"." + type.name());
-    }
+    assertAll(() -> {
+      for (String s : args) {
+        assertFalse(type.isValidFormat(s),
+            s + " should be invalid for " + PropertyType.class.getSimpleName() 
+ "." + type.name());
+      }
+    });
   }
 
   @Test
@@ -317,4 +322,16 @@ public class PropertyTypeTest extends WithTestNames {
     invalid(null, "", "AL L", " ALL", "non import", "     ");
   }
 
+  @Test
+  public void testTypeCOMPRESSION_TYPE() {
+    valid("none", "gz", "lz4", "snappy");
+    // The following are valid at runtime with the correct configuration
+    //
+    // bzip2 java implementation does not implement Compressor/Decompressor, 
requires native
+    // lzo not included in implementation due to license issues, but can be 
added by user
+    // zstd requires hadoop native libraries built with zstd support
+    //
+    invalid(null, "", "bzip2", "lzo", "zstd");
+  }
+
 }
diff --git a/pom.xml b/pom.xml
index bcc057f8f0..837e5d7cbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -952,6 +952,7 @@
                 <unused>biz.aQute.bnd:biz.aQute.bnd.annotation:jar:*</unused>
                 <unused>org.junit.jupiter:junit-jupiter-engine:jar:*</unused>
                 
<unused>org.junit.platform:junit-platform-suite-engine:jar:*</unused>
+                <unused>org.lz4:lz4-java:jar:*</unused>
               </ignoredUnusedDeclaredDependencies>
             </configuration>
           </execution>
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
 
b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
new file mode 100644
index 0000000000..21cb36118d
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Base64;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
+import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class ServerIteratorOptions {
+  static final String COMPRESSION_ALGO = "__COMPRESSION_ALGO";
+
+  private static final String NONE = new NoCompression().getName();
+
+  public interface Serializer {
+    void serialize(DataOutput dataOutput) throws IOException;
+  }
+
+  public static void compressOption(final AccumuloConfiguration config,
+      IteratorSetting iteratorSetting, String option, String value) {
+    final String algo = 
config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO);
+    setAlgo(iteratorSetting, algo);
+
+    if (algo.equals(NONE)) {
+      iteratorSetting.addOption(option, value);
+    } else {
+      compressOption(config, iteratorSetting, option, dataOutput -> {
+        byte[] bytes = value.getBytes(UTF_8);
+        dataOutput.writeInt(bytes.length);
+        dataOutput.write(bytes);
+      });
+    }
+  }
+
+  @VisibleForTesting
+  static void compressOption(final AccumuloConfiguration config, 
IteratorSetting iteratorSetting,
+      String option, Serializer serializer) {
+    final String algo = 
config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO);
+    final CompressionAlgorithm ca = 
Compression.getCompressionAlgorithmByName(algo);
+    final Compressor c = ca.getCompressor();
+
+    setAlgo(iteratorSetting, algo);
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DataOutputStream dos =
+        new DataOutputStream(ca.createCompressionStream(baos, c, 32 * 1024))) {
+      serializer.serialize(dos);
+      dos.close();
+      var val = Base64.getEncoder().encodeToString(baos.toByteArray());
+      iteratorSetting.addOption(option, val);
+    } catch (IOException ioe) {
+      throw new UncheckedIOException(ioe);
+    } finally {
+      ca.returnCompressor(c);
+    }
+  }
+
+  private static void setAlgo(IteratorSetting iteratorSetting, String algo) {
+    if (iteratorSetting.getOptions().containsKey(COMPRESSION_ALGO)) {
+      
Preconditions.checkArgument(iteratorSetting.getOptions().get(COMPRESSION_ALGO).equals(algo));
+    } else {
+      iteratorSetting.addOption(COMPRESSION_ALGO, algo);
+    }
+  }
+
+  public interface Deserializer<T> {
+    T deserialize(DataInputStream dataInput) throws IOException;
+  }
+
+  public static String decompressOption(Map<String,String> options, String 
option) {
+    var algo = options.getOrDefault(COMPRESSION_ALGO, NONE);
+    if (algo.equals(NONE)) {
+      return options.get(option);
+    }
+
+    return decompressOption(options, option, dataInput -> {
+      int len = dataInput.readInt();
+      byte[] data = new byte[len];
+      dataInput.readFully(data);
+      return new String(data, UTF_8);
+    });
+  }
+
+  @VisibleForTesting
+  static <T> T decompressOption(Map<String,String> options, String option,
+      Deserializer<T> deserializer) {
+    var val = options.get(option);
+    if (val == null) {
+      try {
+        return deserializer.deserialize(null);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    var algo = options.getOrDefault(COMPRESSION_ALGO, NONE);
+    final byte[] data = Base64.getDecoder().decode(val);
+    final CompressionAlgorithm ca = 
Compression.getCompressionAlgorithmByName(algo);
+    final Decompressor d = ca.getDecompressor();
+    try (ByteArrayInputStream baos = new ByteArrayInputStream(data); 
DataInputStream dais =
+        new DataInputStream(ca.createDecompressionStream(baos, d, 256 * 
1024))) {
+      return deserializer.deserialize(dais);
+    } catch (IOException ioe) {
+      throw new UncheckedIOException(ioe);
+    } finally {
+      ca.returnDecompressor(d);
+    }
+  }
+}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index ca5b0bafbc..e61c36d181 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.iterators.ServerIteratorOptions;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
 import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 import org.apache.accumulo.server.split.SplitUtils;
@@ -155,13 +156,14 @@ public class TabletManagementIterator extends 
WholeRowIterator {
     }
   }
 
-  public static void configureScanner(final ScannerBase scanner,
+  public static void configureScanner(AccumuloConfiguration conf, final 
ScannerBase scanner,
       final TabletManagementParameters tabletMgmtParams) {
     // Note : if the scanner is ever made to fetch columns, then 
TabletManagement.CONFIGURED_COLUMNS
     // must be updated
     IteratorSetting tabletChange =
         new IteratorSetting(1001, "ManagerTabletInfoIterator", 
TabletManagementIterator.class);
-    tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, 
tabletMgmtParams.serialize());
+    ServerIteratorOptions.compressOption(conf, tabletChange, 
TABLET_GOAL_STATE_PARAMS_OPTION,
+        tabletMgmtParams.serialize());
     scanner.addScanIterator(tabletChange);
   }
 
@@ -178,8 +180,9 @@ public class TabletManagementIterator extends 
WholeRowIterator {
       IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
     this.env = env;
-    tabletMgmtParams =
-        
TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION));
+    String rawParams =
+        ServerIteratorOptions.decompressOption(options, 
TABLET_GOAL_STATE_PARAMS_OPTION);
+    tabletMgmtParams = TabletManagementParameters.deserialize(rawParams);
     compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(),
         tabletMgmtParams.getCompactionHints(), 
tabletMgmtParams.getSteadyTime());
     final AccumuloConfiguration conf = new 
ConfigurationCopy(env.getPluginEnv().getConfiguration());
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
index 3d61dcc346..6eaec59931 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
@@ -90,7 +90,7 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
       throw new RuntimeException("Error obtaining locations for table: " + 
tableName);
     }
     cleanable = CleanerUtil.unclosed(this, TabletManagementScanner.class, 
closed, log, mdScanner);
-    TabletManagementIterator.configureScanner(mdScanner, tmgmtParams);
+    TabletManagementIterator.configureScanner(context.getConfiguration(), 
mdScanner, tmgmtParams);
     mdScanner.setRanges(ranges);
     iter = mdScanner.iterator();
   }
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
new file mode 100644
index 0000000000..518de0ee46
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Strings;
+
+public class ServerIteratorOptionsTest {
+  @Test
+  public void testStringNone() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"none"))
+        .build();
+
+    String v1 = Strings.repeat("a", 100_000);
+    String v2 = Strings.repeat("b", 110_000);
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1);
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2);
+
+    assertEquals(3, iterSetting.getOptions().size());
+
+    // should not copy string when compression type is none
+    assertSame(v1, iterSetting.getOptions().get("k1"));
+    assertSame(v2, iterSetting.getOptions().get("k2"));
+    assertEquals("none", 
iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO));
+
+    // should not copy string when compression type is none
+    assertSame(v1, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1"));
+    assertSame(v2, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2"));
+  }
+
+  @Test
+  public void testStringCompress() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"gz"))
+        .build();
+
+    String v1 = Strings.repeat("a", 100_000);
+    String v2 = Strings.repeat("b", 110_000);
+
+    assertEquals(100_000, v1.length());
+    assertEquals(110_000, v2.length());
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1);
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2);
+
+    assertEquals(3, iterSetting.getOptions().size());
+
+    // the stored value should be much smaller
+    assertTrue(iterSetting.getOptions().get("k1").length() < 1000);
+    assertTrue(iterSetting.getOptions().get("k2").length() < 1000);
+    assertEquals("gz", 
iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO));
+
+    // should not copy string when compression type is none
+    assertEquals(v1, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1"));
+    assertEquals(v2, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2"));
+  }
+
+  @Test
+  public void testSerializeNone() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"none"))
+        .build();
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    Set<KeyExtent> extents = new HashSet<>();
+    TableId tableId = TableId.of("1234");
+
+    for (int i = 1; i < 100_000; i++) {
+      var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)),
+          new Text(String.format("%10d", i - 1)));
+      extents.add(extent);
+    }
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput 
-> {
+      dataOutput.writeInt(extents.size());
+      for (var extent : extents) {
+        extent.writeTo(dataOutput);
+      }
+    });
+
+    // expected minimum size of data, will be larger
+    int expMinSize = 100_000 * (4 + 10 + 10);
+    System.out.println(iterSetting.getOptions().get("k1").length());
+    assertTrue(iterSetting.getOptions().get("k1").length() > expMinSize);
+
+    Set<KeyExtent> extents2 =
+        ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1", 
dataInput -> {
+          int num = dataInput.readInt();
+          HashSet<KeyExtent> es = new HashSet<>(num);
+          for (int i = 0; i < num; i++) {
+            es.add(KeyExtent.readFrom(dataInput));
+          }
+          return es;
+        });
+
+    assertEquals(extents, extents2);
+    assertNotSame(extents, extents2);
+  }
+
+  @Test
+  public void testSerializeGz() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"gz"))
+        .build();
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    Set<KeyExtent> extents = new HashSet<>();
+    TableId tableId = TableId.of("1234");
+
+    for (int i = 1; i < 100_000; i++) {
+      var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)),
+          new Text(String.format("%10d", i - 1)));
+      extents.add(extent);
+    }
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput 
-> {
+      dataOutput.writeInt(extents.size());
+      for (var extent : extents) {
+        extent.writeTo(dataOutput);
+      }
+    });
+
+    // expected minimum size of data
+    int expMinSize = 100_000 * (4 + 10 + 10);
+    System.out.println(iterSetting.getOptions().get("k1").length());
+    // should be smaller than the expected min size because of compression
+    assertTrue(iterSetting.getOptions().get("k1").length() < expMinSize);
+
+    Set<KeyExtent> extents2 =
+        ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1", 
dataInput -> {
+          int num = dataInput.readInt();
+          HashSet<KeyExtent> es = new HashSet<>(num);
+          for (int i = 0; i < num; i++) {
+            es.add(KeyExtent.readFrom(dataInput));
+          }
+          return es;
+        });
+
+    assertEquals(extents, extents2);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 5016070509..075bc33a28 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -23,6 +23,7 @@ import static 
org.apache.accumulo.core.manager.state.TabletManagement.Management
 import static 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction.NEEDS_RECOVERY;
 import static 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction.NEEDS_SPLITTING;
 import static 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction.NEEDS_VOLUME_REPLACEMENT;
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -58,6 +59,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -93,12 +95,16 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 import org.apache.accumulo.server.manager.state.TabletManagementParameters;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,17 +114,33 @@ import com.google.common.collect.Sets;
  * Test to ensure that the {@link TabletManagementIterator} properly skips 
over tablet information
  * in the metadata table when there is no work to be done on the tablet (see 
ACCUMULO-3580)
  */
+@Tag(MINI_CLUSTER_ONLY)
 public class TabletManagementIteratorIT extends AccumuloClusterHarness {
   private final static Logger log = 
LoggerFactory.getLogger(TabletManagementIteratorIT.class);
 
+  private String cType = null;
+
   @Override
   protected Duration defaultTimeout() {
     return Duration.ofMinutes(3);
   }
 
-  @Test
-  public void test() throws AccumuloException, AccumuloSecurityException, 
TableExistsException,
-      TableNotFoundException, IOException {
+  @Override
+  public void setupCluster() {
+    // Overriding to *not* start cluster before test, we are going to do it 
manually
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO, 
cType);
+  }
+
+  @ParameterizedTest()
+  @ValueSource(strings = {"none", "gz", "snappy"})
+  public void test(String compressionType) throws Exception {
+
+    cType = compressionType;
+    super.setupCluster();
 
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 
@@ -432,7 +454,8 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
     Map<KeyExtent,Set<TabletManagement.ManagementAction>> results = new 
HashMap<>();
     List<KeyExtent> resultList = new ArrayList<>();
     try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
-      TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
+      TabletManagementIterator.configureScanner(((ClientContext) 
client).getConfiguration(),
+          scanner, tabletMgmtParams);
       scanner.updateScanIteratorOption("tabletChange", "debug", "1");
       for (Entry<Key,Value> e : scanner) {
         if (e != null) {
@@ -454,8 +477,9 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
   // tablet mgmt iterator works with that.
   private void testContinueScan(AccumuloClient client, String table,
       TabletManagementParameters tabletMgmtParams) throws 
TableNotFoundException {
+    AccumuloConfiguration conf = ((ClientContext) client).getConfiguration();
     try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
-      TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
+      TabletManagementIterator.configureScanner(conf, scanner, 
tabletMgmtParams);
       List<Entry<Key,Value>> entries1 = new ArrayList<>();
       scanner.forEach(e -> entries1.add(e));
       assertTrue(entries1.size() > 1);

Reply via email to