This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f4b1b9a087 compress data used by tablet management iterator (#5736)
f4b1b9a087 is described below
commit f4b1b9a0872867ab82d2b4660ed3c8c1245e3ede
Author: Dom G. <[email protected]>
AuthorDate: Tue Jul 15 11:38:07 2025 -0400
compress data used by tablet management iterator (#5736)
* compress data used by tablet management iterator
---
core/pom.xml | 6 +
.../org/apache/accumulo/core/conf/Property.java | 3 +
.../apache/accumulo/core/conf/PropertyType.java | 32 ++++
.../accumulo/core/conf/PropertyTypeTest.java | 33 +++-
pom.xml | 1 +
.../server/iterators/ServerIteratorOptions.java | 142 ++++++++++++++++
.../manager/state/TabletManagementIterator.java | 11 +-
.../manager/state/TabletManagementScanner.java | 2 +-
.../iterators/ServerIteratorOptionsTest.java | 185 +++++++++++++++++++++
.../functional/TabletManagementIteratorIT.java | 36 +++-
10 files changed, 432 insertions(+), 19 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 028db73ef3..696d4ff8d2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -193,6 +193,12 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<testResources>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 8139e31ff3..0f47eee39e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -377,6 +377,9 @@ public enum Property {
GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0",
PropertyType.STRING,
"The local IP address to which this server should bind for sending and
receiving network traffic.",
"3.0.0"),
+
GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO("general.server.iter.opts.compression",
"none",
+ PropertyType.COMPRESSION_TYPE,
+ "Compression algorithm name to use for server-side iterator options
compression.", "2.1.4"),
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval",
"2m",
PropertyType.TIMEDURATION,
"Interval at which the Manager and TabletServer should verify their
server locks. A value of zero"
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index 576e57ebb4..b874f3f69e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -34,8 +34,11 @@ import java.util.stream.Stream;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
import org.apache.commons.lang3.Range;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.Compressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,6 +135,9 @@ public enum PropertyType {
"A list of fully qualified java class names representing classes on the
classpath.\n"
+ "An example is 'java.lang.String', rather than 'String'"),
+ COMPRESSION_TYPE("compression type name", new ValidCompressionType(),
+ "One of the configured compression types."),
+
DURABILITY("durability", in(false, null, "default", "none", "log", "flush",
"sync"),
"One of 'none', 'log', 'flush' or 'sync'."),
@@ -285,6 +291,32 @@ public enum PropertyType {
|| (suffixCheck.test(x) && new Bounds(lowerBound,
upperBound).test(stripUnits.apply(x)));
}
+ private static class ValidCompressionType implements Predicate<String> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ValidCompressionType.class);
+
+ @Override
+ public boolean test(String type) {
+ if (!Compression.getSupportedAlgorithms().contains(type)) {
+ return false;
+ }
+ CompressionAlgorithm ca =
Compression.getCompressionAlgorithmByName(type);
+ Compressor c = null;
+ try {
+ c = ca.getCompressor();
+ return true;
+ } catch (RuntimeException e) {
+ LOG.error("Error creating compressor for type {}", type, e);
+ return false;
+ } finally {
+ if (c != null) {
+ ca.returnCompressor(c);
+ }
+ }
+ }
+
+ }
+
private static final Pattern SUFFIX_REGEX = Pattern.compile("\\D*$"); //
match non-digits at end
private static final Function<String,String> stripUnits =
x -> x == null ? null : SUFFIX_REGEX.matcher(x.trim()).replaceAll("");
diff --git
a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
index 708ead87c1..ab52ccc309 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.conf;
+import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -84,17 +85,21 @@ public class PropertyTypeTest extends WithTestNames {
}
private void valid(final String... args) {
- for (String s : args) {
- assertTrue(type.isValidFormat(s),
- s + " should be valid for " + PropertyType.class.getSimpleName() +
"." + type.name());
- }
+ assertAll(() -> {
+ for (String s : args) {
+ assertTrue(type.isValidFormat(s),
+ s + " should be valid for " + PropertyType.class.getSimpleName() +
"." + type.name());
+ }
+ });
}
private void invalid(final String... args) {
- for (String s : args) {
- assertFalse(type.isValidFormat(s),
- s + " should be invalid for " + PropertyType.class.getSimpleName() +
"." + type.name());
- }
+ assertAll(() -> {
+ for (String s : args) {
+ assertFalse(type.isValidFormat(s),
+ s + " should be invalid for " + PropertyType.class.getSimpleName()
+ "." + type.name());
+ }
+ });
}
@Test
@@ -317,4 +322,16 @@ public class PropertyTypeTest extends WithTestNames {
invalid(null, "", "AL L", " ALL", "non import", " ");
}
+ @Test
+ public void testTypeCOMPRESSION_TYPE() {
+ valid("none", "gz", "lz4", "snappy");
+ // The following are valid at runtime with the correct configuration
+ //
+ // bzip2 java implementation does not implement Compressor/Decompressor,
requires native
+ // lzo not included in implementation due to license issues, but can be
added by user
+ // zstd requires hadoop native libraries built with zstd support
+ //
+ invalid(null, "", "bzip2", "lzo", "zstd");
+ }
+
}
diff --git a/pom.xml b/pom.xml
index bcc057f8f0..837e5d7cbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -952,6 +952,7 @@
<unused>biz.aQute.bnd:biz.aQute.bnd.annotation:jar:*</unused>
<unused>org.junit.jupiter:junit-jupiter-engine:jar:*</unused>
<unused>org.junit.platform:junit-platform-suite-engine:jar:*</unused>
+ <unused>org.lz4:lz4-java:jar:*</unused>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
new file mode 100644
index 0000000000..21cb36118d
--- /dev/null
+++
b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Base64;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
+import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class ServerIteratorOptions {
+ static final String COMPRESSION_ALGO = "__COMPRESSION_ALGO";
+
+ private static final String NONE = new NoCompression().getName();
+
+ public interface Serializer {
+ void serialize(DataOutput dataOutput) throws IOException;
+ }
+
+ public static void compressOption(final AccumuloConfiguration config,
+ IteratorSetting iteratorSetting, String option, String value) {
+ final String algo =
config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO);
+ setAlgo(iteratorSetting, algo);
+
+ if (algo.equals(NONE)) {
+ iteratorSetting.addOption(option, value);
+ } else {
+ compressOption(config, iteratorSetting, option, dataOutput -> {
+ byte[] bytes = value.getBytes(UTF_8);
+ dataOutput.writeInt(bytes.length);
+ dataOutput.write(bytes);
+ });
+ }
+ }
+
+ @VisibleForTesting
+ static void compressOption(final AccumuloConfiguration config,
IteratorSetting iteratorSetting,
+ String option, Serializer serializer) {
+ final String algo =
config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO);
+ final CompressionAlgorithm ca =
Compression.getCompressionAlgorithmByName(algo);
+ final Compressor c = ca.getCompressor();
+
+ setAlgo(iteratorSetting, algo);
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos =
+ new DataOutputStream(ca.createCompressionStream(baos, c, 32 * 1024))) {
+ serializer.serialize(dos);
+ dos.close();
+ var val = Base64.getEncoder().encodeToString(baos.toByteArray());
+ iteratorSetting.addOption(option, val);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ } finally {
+ ca.returnCompressor(c);
+ }
+ }
+
+ private static void setAlgo(IteratorSetting iteratorSetting, String algo) {
+ if (iteratorSetting.getOptions().containsKey(COMPRESSION_ALGO)) {
+
Preconditions.checkArgument(iteratorSetting.getOptions().get(COMPRESSION_ALGO).equals(algo));
+ } else {
+ iteratorSetting.addOption(COMPRESSION_ALGO, algo);
+ }
+ }
+
+ public interface Deserializer<T> {
+ T deserialize(DataInputStream dataInput) throws IOException;
+ }
+
+ public static String decompressOption(Map<String,String> options, String
option) {
+ var algo = options.getOrDefault(COMPRESSION_ALGO, NONE);
+ if (algo.equals(NONE)) {
+ return options.get(option);
+ }
+
+ return decompressOption(options, option, dataInput -> {
+ int len = dataInput.readInt();
+ byte[] data = new byte[len];
+ dataInput.readFully(data);
+ return new String(data, UTF_8);
+ });
+ }
+
+ @VisibleForTesting
+ static <T> T decompressOption(Map<String,String> options, String option,
+ Deserializer<T> deserializer) {
+ var val = options.get(option);
+ if (val == null) {
+ try {
+ return deserializer.deserialize(null);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ var algo = options.getOrDefault(COMPRESSION_ALGO, NONE);
+ final byte[] data = Base64.getDecoder().decode(val);
+ final CompressionAlgorithm ca =
Compression.getCompressionAlgorithmByName(algo);
+ final Decompressor d = ca.getDecompressor();
+ try (ByteArrayInputStream baos = new ByteArrayInputStream(data);
DataInputStream dais =
+ new DataInputStream(ca.createDecompressionStream(baos, d, 256 *
1024))) {
+ return deserializer.deserialize(dais);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ } finally {
+ ca.returnDecompressor(d);
+ }
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index ca5b0bafbc..e61c36d181 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.server.compaction.CompactionJobGenerator;
import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.iterators.ServerIteratorOptions;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
import org.apache.accumulo.server.split.SplitUtils;
@@ -155,13 +156,14 @@ public class TabletManagementIterator extends
WholeRowIterator {
}
}
- public static void configureScanner(final ScannerBase scanner,
+ public static void configureScanner(AccumuloConfiguration conf, final
ScannerBase scanner,
final TabletManagementParameters tabletMgmtParams) {
// Note : if the scanner is ever made to fetch columns, then
TabletManagement.CONFIGURED_COLUMNS
// must be updated
IteratorSetting tabletChange =
new IteratorSetting(1001, "ManagerTabletInfoIterator",
TabletManagementIterator.class);
- tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION,
tabletMgmtParams.serialize());
+ ServerIteratorOptions.compressOption(conf, tabletChange,
TABLET_GOAL_STATE_PARAMS_OPTION,
+ tabletMgmtParams.serialize());
scanner.addScanIterator(tabletChange);
}
@@ -178,8 +180,9 @@ public class TabletManagementIterator extends
WholeRowIterator {
IteratorEnvironment env) throws IOException {
super.init(source, options, env);
this.env = env;
- tabletMgmtParams =
-
TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION));
+ String rawParams =
+ ServerIteratorOptions.decompressOption(options,
TABLET_GOAL_STATE_PARAMS_OPTION);
+ tabletMgmtParams = TabletManagementParameters.deserialize(rawParams);
compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(),
tabletMgmtParams.getCompactionHints(),
tabletMgmtParams.getSteadyTime());
final AccumuloConfiguration conf = new
ConfigurationCopy(env.getPluginEnv().getConfiguration());
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
index 3d61dcc346..6eaec59931 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
@@ -90,7 +90,7 @@ public class TabletManagementScanner implements
ClosableIterator<TabletManagemen
throw new RuntimeException("Error obtaining locations for table: " +
tableName);
}
cleanable = CleanerUtil.unclosed(this, TabletManagementScanner.class,
closed, log, mdScanner);
- TabletManagementIterator.configureScanner(mdScanner, tmgmtParams);
+ TabletManagementIterator.configureScanner(context.getConfiguration(),
mdScanner, tmgmtParams);
mdScanner.setRanges(ranges);
iter = mdScanner.iterator();
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
new file mode 100644
index 0000000000..518de0ee46
--- /dev/null
+++
b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Strings;
+
+public class ServerIteratorOptionsTest {
+ @Test
+ public void testStringNone() {
+ var aconf = SiteConfiguration.empty()
+ .withOverrides(
+
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(),
"none"))
+ .build();
+
+ String v1 = Strings.repeat("a", 100_000);
+ String v2 = Strings.repeat("b", 110_000);
+
+ IteratorSetting iterSetting = new IteratorSetting(100, "ti",
"TestIter.class");
+
+ ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1);
+ ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2);
+
+ assertEquals(3, iterSetting.getOptions().size());
+
+ // should not copy string when compression type is none
+ assertSame(v1, iterSetting.getOptions().get("k1"));
+ assertSame(v2, iterSetting.getOptions().get("k2"));
+ assertEquals("none",
iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO));
+
+ // should not copy string when compression type is none
+ assertSame(v1,
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1"));
+ assertSame(v2,
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2"));
+ }
+
+ @Test
+ public void testStringCompress() {
+ var aconf = SiteConfiguration.empty()
+ .withOverrides(
+
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(),
"gz"))
+ .build();
+
+ String v1 = Strings.repeat("a", 100_000);
+ String v2 = Strings.repeat("b", 110_000);
+
+ assertEquals(100_000, v1.length());
+ assertEquals(110_000, v2.length());
+
+ IteratorSetting iterSetting = new IteratorSetting(100, "ti",
"TestIter.class");
+
+ ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1);
+ ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2);
+
+ assertEquals(3, iterSetting.getOptions().size());
+
+ // the stored value should be much smaller
+ assertTrue(iterSetting.getOptions().get("k1").length() < 1000);
+ assertTrue(iterSetting.getOptions().get("k2").length() < 1000);
+ assertEquals("gz",
iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO));
+
+ // should not copy string when compression type is none
+ assertEquals(v1,
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1"));
+ assertEquals(v2,
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2"));
+ }
+
+ @Test
+ public void testSerializeNone() {
+ var aconf = SiteConfiguration.empty()
+ .withOverrides(
+
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(),
"none"))
+ .build();
+
+ IteratorSetting iterSetting = new IteratorSetting(100, "ti",
"TestIter.class");
+
+ Set<KeyExtent> extents = new HashSet<>();
+ TableId tableId = TableId.of("1234");
+
+ for (int i = 1; i < 100_000; i++) {
+ var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)),
+ new Text(String.format("%10d", i - 1)));
+ extents.add(extent);
+ }
+
+ ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput
-> {
+ dataOutput.writeInt(extents.size());
+ for (var extent : extents) {
+ extent.writeTo(dataOutput);
+ }
+ });
+
+ // expected minimum size of data, will be larger
+ int expMinSize = 100_000 * (4 + 10 + 10);
+ System.out.println(iterSetting.getOptions().get("k1").length());
+ assertTrue(iterSetting.getOptions().get("k1").length() > expMinSize);
+
+ Set<KeyExtent> extents2 =
+ ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1",
dataInput -> {
+ int num = dataInput.readInt();
+ HashSet<KeyExtent> es = new HashSet<>(num);
+ for (int i = 0; i < num; i++) {
+ es.add(KeyExtent.readFrom(dataInput));
+ }
+ return es;
+ });
+
+ assertEquals(extents, extents2);
+ assertNotSame(extents, extents2);
+ }
+
+ @Test
+ public void testSerializeGz() {
+ var aconf = SiteConfiguration.empty()
+ .withOverrides(
+
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(),
"gz"))
+ .build();
+
+ IteratorSetting iterSetting = new IteratorSetting(100, "ti",
"TestIter.class");
+
+ Set<KeyExtent> extents = new HashSet<>();
+ TableId tableId = TableId.of("1234");
+
+ for (int i = 1; i < 100_000; i++) {
+ var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)),
+ new Text(String.format("%10d", i - 1)));
+ extents.add(extent);
+ }
+
+ ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput
-> {
+ dataOutput.writeInt(extents.size());
+ for (var extent : extents) {
+ extent.writeTo(dataOutput);
+ }
+ });
+
+ // expected minimum size of data
+ int expMinSize = 100_000 * (4 + 10 + 10);
+ System.out.println(iterSetting.getOptions().get("k1").length());
+ // should be smaller than the expected min size because of compression
+ assertTrue(iterSetting.getOptions().get("k1").length() < expMinSize);
+
+ Set<KeyExtent> extents2 =
+ ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1",
dataInput -> {
+ int num = dataInput.readInt();
+ HashSet<KeyExtent> es = new HashSet<>(num);
+ for (int i = 0; i < num; i++) {
+ es.add(KeyExtent.readFrom(dataInput));
+ }
+ return es;
+ });
+
+ assertEquals(extents, extents2);
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 5016070509..075bc33a28 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -23,6 +23,7 @@ import static
org.apache.accumulo.core.manager.state.TabletManagement.Management
import static
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction.NEEDS_RECOVERY;
import static
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction.NEEDS_SPLITTING;
import static
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction.NEEDS_VOLUME_REPLACEMENT;
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -58,6 +59,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -93,12 +95,16 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.manager.state.TabletManagementIterator;
import org.apache.accumulo.server.manager.state.TabletManagementParameters;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,17 +114,33 @@ import com.google.common.collect.Sets;
* Test to ensure that the {@link TabletManagementIterator} properly skips
over tablet information
* in the metadata table when there is no work to be done on the tablet (see
ACCUMULO-3580)
*/
+@Tag(MINI_CLUSTER_ONLY)
public class TabletManagementIteratorIT extends AccumuloClusterHarness {
private final static Logger log =
LoggerFactory.getLogger(TabletManagementIteratorIT.class);
+ private String cType = null;
+
@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(3);
}
- @Test
- public void test() throws AccumuloException, AccumuloSecurityException,
TableExistsException,
- TableNotFoundException, IOException {
+ @Override
+ public void setupCluster() {
+ // Overriding to *not* start cluster before test, we are going to do it
manually
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setProperty(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO,
cType);
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"none", "gz", "snappy"})
+ public void test(String compressionType) throws Exception {
+
+ cType = compressionType;
+ super.setupCluster();
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
@@ -432,7 +454,8 @@ public class TabletManagementIteratorIT extends
AccumuloClusterHarness {
Map<KeyExtent,Set<TabletManagement.ManagementAction>> results = new
HashMap<>();
List<KeyExtent> resultList = new ArrayList<>();
try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
- TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
+ TabletManagementIterator.configureScanner(((ClientContext)
client).getConfiguration(),
+ scanner, tabletMgmtParams);
scanner.updateScanIteratorOption("tabletChange", "debug", "1");
for (Entry<Key,Value> e : scanner) {
if (e != null) {
@@ -454,8 +477,9 @@ public class TabletManagementIteratorIT extends
AccumuloClusterHarness {
// tablet mgmt iterator works with that.
private void testContinueScan(AccumuloClient client, String table,
TabletManagementParameters tabletMgmtParams) throws
TableNotFoundException {
+ AccumuloConfiguration conf = ((ClientContext) client).getConfiguration();
try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
- TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
+ TabletManagementIterator.configureScanner(conf, scanner,
tabletMgmtParams);
List<Entry<Key,Value>> entries1 = new ArrayList<>();
scanner.forEach(e -> entries1.add(e));
assertTrue(entries1.size() > 1);