This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 136e5fd222 Add export, list, import sub-commands for nodetool
compressiondictionary
136e5fd222 is described below
commit 136e5fd222a087d6104d9dce307d759d1f6a5d3e
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Tue Oct 21 17:50:48 2025 +0200
Add export, list, import sub-commands for nodetool compressiondictionary
patch by Stefan Miklosovic; reviewed by Yifan Cai for CASSANDRA-20941
---
CHANGES.txt | 1 +
.../pages/managing/operating/compression.adoc | 14 +
.../db/compression/CompressionDictionary.java | 70 ++++-
.../CompressionDictionaryDetailsTabularData.java | 294 ++++++++++++++++++++
.../CompressionDictionaryEventHandler.java | 2 +-
.../compression/CompressionDictionaryManager.java | 83 ++++++
.../CompressionDictionaryManagerMBean.java | 41 ++-
.../db/compression/ZstdCompressionDictionary.java | 17 ++
.../db/compression/ZstdDictionaryTrainer.java | 3 +-
.../schema/SystemDistributedKeyspace.java | 76 ++++-
src/java/org/apache/cassandra/tools/NodeProbe.java | 75 ++++-
.../tools/nodetool/CompressionDictionary.java | 122 --------
.../CompressionDictionaryCommandGroup.java | 308 +++++++++++++++++++++
.../cassandra/tools/nodetool/NodetoolCommand.java | 2 +-
test/resources/nodetool/help/compressiondictionary | 27 ++
...siondictionary => compressiondictionary$export} | 35 +--
...siondictionary => compressiondictionary$import} | 27 +-
...essiondictionary => compressiondictionary$list} | 28 +-
.../CompressionDictionaryDataObjectTest.java | 206 ++++++++++++++
.../cassandra/io/sstable/CQLSSTableWriterTest.java | 51 +---
...stributedKeyspaceCompressionDictionaryTest.java | 6 +-
.../ExportImportListCompressionDictionaryTest.java | 246 ++++++++++++++++
.../utils/CompressionDictionaryHelper.java | 70 +++++
23 files changed, 1561 insertions(+), 243 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 332107e491..2dd91eacdf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add export, list, import sub-commands for nodetool compressiondictionary
(CASSANDRA-20941)
* Add support in the binary protocol to allow transactions to have multiple
conditions (CASSANDRA-20883)
* Enable CQLSSTableWriter to create SSTables compressed with a dictionary
(CASSANDRA-20938)
* Support ZSTD dictionary compression (CASSANDRA-17021)
diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc
b/doc/modules/cassandra/pages/managing/operating/compression.adoc
index bc0a330183..22214e046b 100644
--- a/doc/modules/cassandra/pages/managing/operating/compression.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc
@@ -407,6 +407,20 @@ require retraining dictionaries to maintain optimal
compression ratios.
Monitor the `SSTable Compression Ratio` via `nodetool tablestats` to
detect degradation.
+=== Available nodetool commands for compressiondictionary
+
+There are these four commands for now related to compression dictionaries:
+
+* train - training a dictionary, described above.
+* list - a user can list all dictionaries for given keyspace and table
+* export - a user can export a compression dictionary of a keyspace and a
table, either the last one or
+by a specific id, to a file.
+* import - a user can import a compression dictionary, exported by above
command, from a file to a cluster.
+
+Importing a dictionary to a table from a file should happen only against one
node at a time as
+dictionary will be eventually stored in
`system_distributed.compression_dictionaries` table and reused
+cluster-wide. When imports happen from multiple nodes, the highest-version
dictionary will be used.
+
== Advanced Use
Advanced users can provide their own compression class by implementing
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index d982210022..f8e4e500ac 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -49,6 +49,13 @@ public interface CompressionDictionary extends AutoCloseable
*/
byte[] rawDictionary();
+ /**
+ * Get checksum of this dictionary.
+ *
+ * @return checksum of this dictionary
+ */
+ int checksum();
+
/**
* Get the kind of the compression algorithm
*
@@ -126,7 +133,7 @@ public interface CompressionDictionary extends AutoCloseable
throw new IOException("Compression dictionary checksum does not
match. " +
"Expected: " + checksum + "; actual: " +
calculatedChecksum);
- CompressionDictionary dictionary = kind.createDictionary(dictId, dict);
+ CompressionDictionary dictionary = kind.createDictionary(dictId, dict,
checksum);
// update the dictionary manager if it exists
if (manager != null)
@@ -137,6 +144,29 @@ public interface CompressionDictionary extends
AutoCloseable
return dictionary;
}
+ static LightweightCompressionDictionary
createFromRowLightweight(UntypedResultSet.Row row)
+ {
+ String kindStr = row.getString("kind");
+ long dictId = row.getLong("dict_id");
+ int checksum = row.getInt("dict_checksum");
+ int size = row.getInt("dict_length");
+ String keyspaceName = row.getString("keyspace_name");
+ String tableName = row.getString("table_name");
+
+ try
+ {
+ return new LightweightCompressionDictionary(keyspaceName,
+ tableName,
+ new
DictId(CompressionDictionary.Kind.valueOf(kindStr), dictId),
+ checksum,
+ size);
+ }
+ catch (IllegalArgumentException ex)
+ {
+ throw new IllegalStateException(kindStr + " compression dictionary
is not created for dict id " + dictId);
+ }
+ }
+
static CompressionDictionary createFromRow(UntypedResultSet.Row row)
{
String kindStr = row.getString("kind");
@@ -164,7 +194,7 @@ public interface CompressionDictionary extends AutoCloseable
kindStr,
dictId, storedChecksum, calculatedChecksum));
}
- return kind.createDictionary(new DictId(kind, dictId), dict);
+ return kind.createDictionary(new DictId(kind, dictId),
row.getByteArray("dict"), storedChecksum);
}
catch (IllegalArgumentException ex)
{
@@ -188,9 +218,10 @@ public interface CompressionDictionary extends
AutoCloseable
// Order matters: the enum ordinal is serialized
ZSTD
{
- public CompressionDictionary createDictionary(DictId dictId,
byte[] dict)
+ @Override
+ public CompressionDictionary createDictionary(DictId dictId,
byte[] dict, int checksum)
{
- return new ZstdCompressionDictionary(dictId, dict);
+ return new ZstdCompressionDictionary(dictId, dict, checksum);
}
@Override
@@ -220,9 +251,10 @@ public interface CompressionDictionary extends
AutoCloseable
*
* @param dictId the dictionary identifier
* @param dict the raw dictionary bytes
+ * @param checksum checksum of this dictionary
* @return a compression dictionary instance
*/
- public abstract CompressionDictionary
createDictionary(CompressionDictionary.DictId dictId, byte[] dict);
+ public abstract CompressionDictionary
createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int
checksum);
/**
* Creates a dictionary compressor for this kind
@@ -281,4 +313,32 @@ public interface CompressionDictionary extends
AutoCloseable
'}';
}
}
+
+ /**
+ * The purpose of lightweight dictionary is to not carry the actual
dictionary bytes for performance reasons.
+ * Handy for situations when retrieval from the database does not need to
contain dictionary
+ * or the instantiation of a proper dictionary object is not desirable or
unnecessary for other,
+ * mostly performance-related, reasons.
+ */
+ class LightweightCompressionDictionary
+ {
+ public final String keyspaceName;
+ public final String tableName;
+ public final DictId dictId;
+ public final int checksum;
+ public final int size;
+
+ public LightweightCompressionDictionary(String keyspaceName,
+ String tableName,
+ DictId dictId,
+ int checksum,
+ int size)
+ {
+ this.keyspaceName = keyspaceName;
+ this.tableName = tableName;
+ this.dictId = dictId;
+ this.checksum = checksum;
+ this.size = size;
+ }
+ }
}
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
new file mode 100644
index 0000000000..af6c8bb4f4
--- /dev/null
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.util.Arrays;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import org.apache.cassandra.io.util.FileUtils;
+
+import static java.lang.String.format;
+
+public class CompressionDictionaryDetailsTabularData
+{
+ /**
+ * Position inside index names of tabular type of tabular data returned
upon
+ * listing dictionaries where raw dictionary is expected to be located.
+ * We do not need to process this entry as listing does not contain any
raw dictionary,
+ * only exporting does.
+ */
+ public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3;
+
+ public static final String KEYSPACE_NAME = "Keyspace";
+ public static final String TABLE_NAME = "Table";
+ public static final String DICT_ID_NAME = "DictId";
+ public static final String DICT_NAME = "Dict";
+ public static final String KIND_NAME = "Kind";
+ public static final String CHECKSUM_NAME = "Checksum";
+ public static final String SIZE_NAME = "Size";
+
+
+ private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME,
+ TABLE_NAME,
+ DICT_ID_NAME,
+ DICT_NAME,
+ KIND_NAME,
+ CHECKSUM_NAME,
+ SIZE_NAME };
+
+ private static final String[] ITEM_DESCS = new String[]{ "keyspace",
+ "table",
+ "dictionary_id",
+
"dictionary_bytes",
+ "kind",
+ "checksum",
+ "size" };
+
+ private static final String TYPE_NAME = "DictionaryDetails";
+ private static final String ROW_DESC = "DictionaryDetails";
+ private static final OpenType<?>[] ITEM_TYPES;
+ private static final CompositeType COMPOSITE_TYPE;
+ public static final TabularType TABULAR_TYPE;
+
+ static
+ {
+ try
+ {
+ ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace
+ SimpleType.STRING, // table
+ SimpleType.LONG, // dict id
+ new
ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes
+ SimpleType.STRING, // kind
+ SimpleType.INTEGER, // checksum
+ SimpleType.INTEGER }; // size of dict
bytes
+
+ COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC,
ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
+ TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC,
COMPOSITE_TYPE, ITEM_NAMES);
+ }
+ catch (OpenDataException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This method is meant to be call when listing dictionaries, we do not
need actual dictionary byte arrays.
+ *
+ * @param dictionary lightweight dictionary to create composite data from
+ * @return composite data representing dictionary
+ */
+ public static CompositeData
fromLightweightCompressionDictionary(LightweightCompressionDictionary
dictionary)
+ {
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE,
+ ITEM_NAMES,
+ new Object[]
+ {
+ dictionary.keyspaceName,
+ dictionary.tableName,
+ dictionary.dictId.id,
+ null, // on purpose not returning
actual dictionary
+ dictionary.dictId.kind.name(),
+ dictionary.checksum,
+ dictionary.size,
+ });
+ }
+ catch (OpenDataException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Used upon exporting a dictionary.
+ *
+ * @param keyspace keyspace of a dictionary
+ * @param table table of a dictionary
+ * @param dictionary dictionary itself
+ * @return composite data representing dictionary
+ */
+ public static CompositeData fromCompressionDictionary(String keyspace,
String table, CompressionDictionary dictionary)
+ {
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE,
+ ITEM_NAMES,
+ new Object[]
+ {
+ keyspace,
+ table,
+ dictionary.dictId().id,
+ dictionary.rawDictionary(),
+ dictionary.kind().name(),
+ dictionary.checksum(),
+ dictionary.rawDictionary().length,
+ });
+ }
+ catch (OpenDataException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Used upon deserialisation of data to composite data, e.g. upon
importing a dictionary from JSON
+ * string to CompositeData before they are sent over JMX to import method.
+ *
+ * @param dataObject data class object to get composite data from
+ * @return composite data representing data class
+ */
+ public static CompositeData
fromCompressionDictionaryDataObject(CompressionDictionaryDataObject dataObject)
+ {
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE,
+ ITEM_NAMES,
+ new Object[]
+ {
+ dataObject.keyspace,
+ dataObject.table,
+ dataObject.dictId,
+ dataObject.dict,
+ dataObject.kind,
+ dataObject.dictChecksum,
+ dataObject.dictLength
+ });
+ }
+ catch (OpenDataException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Deserializes data to convenience object to work further with.
+ *
+ * @param compositeData data to create data object from
+ * @return deserialized composite data to convenience object
+ * @throws IllegalArgumentException if values in deserialized object are
invalid.
+ */
+ public static CompressionDictionaryDataObject
fromCompositeData(CompositeData compositeData)
+ {
+ return new CompressionDictionaryDataObject((String)
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME),
+ (String)
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME),
+ (Long)
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME),
+ (byte[])
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME),
+ (String)
compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME),
+ (Integer)
compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME),
+ (Integer)
compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME));
+ }
+
+ public static class CompressionDictionaryDataObject
+ {
+ public final String keyspace;
+ public final String table;
+ public final long dictId;
+ public final byte[] dict;
+ public final String kind;
+ public final int dictChecksum;
+ public final int dictLength;
+
+ @JsonCreator
+ public CompressionDictionaryDataObject(@JsonProperty("keyspace")
String keyspace,
+ @JsonProperty("table") String
table,
+ @JsonProperty("dictId") long
dictId,
+ @JsonProperty("dict") byte[]
dict,
+ @JsonProperty("kind") String
kind,
+ @JsonProperty("dictChecksum")
int dictChecksum,
+ @JsonProperty("dictLength") int
dictLength)
+ {
+ this.keyspace = keyspace;
+ this.table = table;
+ this.dictId = dictId;
+ this.dict = dict;
+ this.kind = kind;
+ this.dictChecksum = dictChecksum;
+ this.dictLength = dictLength;
+
+ validate();
+ }
+
+ /**
+ * An object of this class is considered to be valid if:
+ *
+ * <ul>
+ * <li>keyspace and table are not null</li>
+ * <li>dict id is lower than 0</li>
+ * <li>dict is not null nor empty</li>
+ * <li>dict length is less than or equal to 1MiB</li>
+ * <li>kind is not null and there is such {@link
org.apache.cassandra.db.compression.CompressionDictionary.Kind}</li>
+ * <li>dictLength is bigger than 0</li>
+ * <li>dictLength has to be equal to dict's length</li>
+ * <li>dictChecksum has to be equal to checksum computed as part
of this method</li>
+ * </ul>
+ */
+ private void validate()
+ {
+ if (keyspace == null)
+ throw new IllegalArgumentException("Keyspace not specified.");
+ if (table == null)
+ throw new IllegalArgumentException("Table not specified.");
+ if (dictId <= 0)
+ throw new IllegalArgumentException("Provided dictionary id
must be positive but it is '" + dictId + "'.");
+ if (dict == null || dict.length == 0)
+ throw new IllegalArgumentException("Provided dictionary byte
array is null or empty.");
+ if (dict.length > FileUtils.ONE_MIB)
+ throw new IllegalArgumentException("Imported dictionary can
not be larger than " +
+ FileUtils.ONE_MIB + "
bytes, but it is " +
+ dict.length + " bytes.");
+ if (kind == null)
+ throw new IllegalArgumentException("Provided kind is null.");
+
+ CompressionDictionary.Kind dictionaryKind;
+
+ try
+ {
+ dictionaryKind = CompressionDictionary.Kind.valueOf(kind);
+ }
+ catch (IllegalArgumentException ex)
+ {
+ throw new IllegalArgumentException("There is no such
dictionary kind like '" + kind + "'. Available kinds: " +
Arrays.asList(CompressionDictionary.Kind.values()));
+ }
+
+ if (dictLength <= 0)
+ throw new IllegalArgumentException("Size has to be strictly
positive number, it is '" + dictLength + "'.");
+ if (dict.length != dictLength)
+ throw new IllegalArgumentException("The length of the provided
dictionary array (" + dict.length + ") is not equal to provided length value ("
+ dictLength + ").");
+
+ int checksumOfDictionaryToImport =
CompressionDictionary.calculateChecksum((byte) dictionaryKind.ordinal(),
dictId, dict);
+ if (checksumOfDictionaryToImport != dictChecksum)
+ {
+ throw new IllegalArgumentException(format("Computed checksum
of dictionary to import (%s) is different from checksum specified on input
(%s).",
+
checksumOfDictionaryToImport,
+ dictChecksum));
+ }
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
index f0193de063..f5e34dbdfe 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
@@ -86,7 +86,7 @@ public class CompressionDictionaryEventHandler implements
ICompressionDictionary
return;
}
- CompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, dictionaryId);
+ CompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, dictionaryId.id);
cache.add(dictionary);
}
catch (Exception e)
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
index 7350f18ae9..f235f855fe 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
@@ -19,9 +19,12 @@
package org.apache.cassandra.db.compression;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -29,12 +32,16 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MBeanWrapper.OnException;
+import static java.lang.String.format;
+
public class CompressionDictionaryManager implements
CompressionDictionaryManagerMBean,
ICompressionDictionaryCache,
ICompressionDictionaryEventHandler,
@@ -244,6 +251,82 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
return dictionaryTrainer.getTrainingState().toCompositeData();
}
+ @Override
+ public TabularData listCompressionDictionaries()
+ {
+ List<LightweightCompressionDictionary> dictionaries =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries(keyspaceName,
tableName);
+ TabularDataSupport tableData = new
TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE);
+
+ if (dictionaries == null)
+ {
+ return tableData;
+ }
+
+ for (LightweightCompressionDictionary dictionary : dictionaries)
+ {
+
tableData.put(CompressionDictionaryDetailsTabularData.fromLightweightCompressionDictionary(dictionary));
+ }
+
+ return tableData;
+ }
+
+ @Override
+ public CompositeData getCompressionDictionary()
+ {
+ CompressionDictionary compressionDictionary =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName,
tableName);
+ if (compressionDictionary == null)
+ return null;
+
+ return
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName,
tableName, compressionDictionary);
+ }
+
+ @Override
+ public CompositeData getCompressionDictionary(long dictId)
+ {
+ CompressionDictionary compressionDictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, dictId);
+ if (compressionDictionary == null)
+ return null;
+
+ return
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName,
tableName, compressionDictionary);
+ }
+
+ @Override
+ public synchronized void importCompressionDictionary(CompositeData
compositeData)
+ {
+ if (!isEnabled)
+ {
+ throw new IllegalStateException(format("The compression on table
%s.%s is not enabled or SSTable compressor is not a dictionary compressor.",
+ keyspaceName, tableName));
+ }
+
+ CompressionDictionaryDataObject dataObject =
CompressionDictionaryDetailsTabularData.fromCompositeData(compositeData);
+
+ if (!keyspaceName.equals(dataObject.keyspace) ||
!tableName.equals(dataObject.table))
+ throw new IllegalArgumentException(format("Keyspace and table of a
dictionary to import (%s.%s) does not correspond to the keyspace and table this
manager is responsible for (%s.%s)",
+ dataObject.keyspace,
dataObject.table,
+ keyspaceName,
tableName));
+
+ CompressionDictionary.Kind kind =
CompressionDictionary.Kind.valueOf(dataObject.kind);
+
+ if (trainer.kind() != kind)
+ {
+ throw new IllegalArgumentException(format("It is not possible to
import compression dictionaries of kind " +
+ "%s into table %s.%s
which supports compression dictionaries of kind %s.",
+ kind, keyspaceName,
tableName, trainer.kind()));
+ }
+
+ CompressionDictionary.DictId dictId = new
CompressionDictionary.DictId(kind, dataObject.dictId);
+
+ LightweightCompressionDictionary latestCompressionDictionary =
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName,
tableName);
+ if (latestCompressionDictionary != null &&
latestCompressionDictionary.dictId.id > dictId.id)
+ {
+ throw new IllegalArgumentException(format("Dictionary to import
has older dictionary id (%s) than the latest compression dictionary (%s) for
table %s.%s",
+ dictId.id,
latestCompressionDictionary.dictId.id, keyspaceName, tableName));
+ }
+
+ handleNewDictionary(kind.createDictionary(dictId, dataObject.dict,
dataObject.dictChecksum));
+ }
+
/**
* Close all the resources. The method can be called multiple times.
*/
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
index b97af20859..3dc349d115 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.db.compression;
+import javax.annotation.Nullable;
import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
public interface CompressionDictionaryManagerMBean
{
@@ -33,7 +35,7 @@ public interface CompressionDictionaryManagerMBean
* @param force force the dictionary training even if there are not enough
samples;
* otherwise, dictionary training won't start if the trainer
is not ready
* @throws UnsupportedOperationException if table doesn't support
dictionary compression
- * @throws IllegalStateException if no SSTables available after flush
+ * @throws IllegalStateException if no SSTables available after
flush
*/
void train(boolean force);
@@ -44,4 +46,41 @@ public interface CompressionDictionaryManagerMBean
* @return CompositeData representing {@link TrainingState}
*/
CompositeData getTrainingState();
+
+ /**
+ * Lists compression dictionaries for given keyspace and table. Returned
compression dictionaries
+ * do not contain raw dictionary bytes.
+ *
+ * @return compression dictionaries for given keyspace and table
+ */
+ TabularData listCompressionDictionaries();
+
+ /**
+ * Get latest compression dictionary.
+ *
+ * @return CompositeData representing compression dictionary or null if
not found
+ */
+ @Nullable
+ CompositeData getCompressionDictionary();
+
+ /**
+ * Get compression dictionary.
+ *
+ * @param dictId id of compression dictionary to get
+ * @return CompositeData representing compression dictionary or null of
not found
+ */
+ @Nullable
+ CompositeData getCompressionDictionary(long dictId);
+
+ /**
+ * Import a compression dictionary.
+ *
+ * @param dictionary compression dictionary to import
+ * @throws IllegalArgumentException when dictionary to import is older
(based on dictionary id) than
+ * the latest compression dictionary for given table, or when dictionary
data are invalid
+ * @throws IllegalStateException if underlying table does not support
dictionary compression or
+ * kind of dictionary to import does not
match kind of dictionary table
+ * is configured for
+ */
+ void importCompressionDictionary(CompositeData dictionary);
}
diff --git
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
index a30d011ade..a6935b311e 100644
---
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
+++
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
@@ -22,6 +22,7 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,16 +39,26 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
private final DictId dictId;
private final byte[] rawDictionary;
+ private final int checksum;
// One ZstdDictDecompress and multiple ZstdDictCompress (per level) can be
derived from the same raw dictionary content
private final ConcurrentHashMap<Integer, ZstdDictCompress>
zstdDictCompressPerLevel = new ConcurrentHashMap<>();
private volatile ZstdDictDecompress dictDecompress;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Ref<ZstdCompressionDictionary> selfRef;
+ @VisibleForTesting
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary)
+ {
+ this(dictId,
+ rawDictionary,
+ CompressionDictionary.calculateChecksum((byte)
dictId.kind.ordinal(), dictId.id, rawDictionary));
+ }
+
+ public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary, int
checksum)
{
this.dictId = dictId;
this.rawDictionary = rawDictionary;
+ this.checksum = checksum;
this.selfRef = new Ref<>(this, new Tidy(zstdDictCompressPerLevel,
dictDecompress));
}
@@ -69,6 +80,12 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
return rawDictionary;
}
+ @Override
+ public int checksum()
+ {
+ return checksum;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
index f08401ae5d..81e99c22bc 100644
--- a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
+++ b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
@@ -136,7 +136,8 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
DictId dictId = new DictId(Kind.ZSTD,
makeDictionaryId(Clock.Global.currentTimeMillis(), zstdDictId));
currentTrainingStatus = TrainingStatus.COMPLETED;
logger.debug("New dictionary is trained with {}", dictId);
- CompressionDictionary dictionary = new
ZstdCompressionDictionary(dictId, dictBytes);
+ int checksum = CompressionDictionary.calculateChecksum((byte)
dictId.kind.ordinal(), dictId.id, dictBytes);
+ CompressionDictionary dictionary =
Kind.ZSTD.createDictionary(dictId, dictBytes, checksum);
notifyDictionaryTrainedListener(dictionary);
return dictionary;
}
diff --git
a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index e7769db182..0a29e72961 100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -49,6 +50,7 @@ import
org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.db.compression.CompressionDictionary;
@@ -417,7 +419,6 @@ public final class SystemDistributedKeyspace
public static void storeCompressionDictionary(String keyspaceName, String
tableName, CompressionDictionary dictionary)
{
byte[] dict = dictionary.rawDictionary();
- int checksum = CompressionDictionary.calculateChecksum((byte)
dictionary.kind().ordinal(), dictionary.dictId().id, dict);
String query = "INSERT INTO %s.%s (keyspace_name, table_name, kind,
dict_id, dict, dict_length, dict_checksum) VALUES ('%s', '%s', '%s', %s, ?, %s,
%s)";
String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
@@ -427,7 +428,7 @@ public final class SystemDistributedKeyspace
dictionary.kind(),
dictionary.dictId().id,
dict.length,
- checksum);
+ dictionary.checksum());
noThrow(fmtQuery,
() -> QueryProcessor.process(fmtQuery, ConsistencyLevel.ONE,
Collections.singletonList(ByteBuffer.wrap(dict))));
@@ -448,8 +449,7 @@ public final class SystemDistributedKeyspace
String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName);
try
{
- UntypedResultSet.Row row = QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one();
- return CompressionDictionary.createFromRow(row);
+ return
CompressionDictionary.createFromRow(QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one());
}
catch (Exception e)
{
@@ -458,7 +458,32 @@ public final class SystemDistributedKeyspace
}
/**
- * Retrieves a specific compression dictionary for a given keyspace and
table.
+ * Retrieves the latest compression dictionary for a given keyspace and
table
+ * backed by {@link LightweightCompressionDictionary} object.
+ *
+ * @param keyspaceName the keyspace name to retrieve the dictionary for
+ * @param tableName the table name to retrieve the dictionary for
+ * @return the latest compression dictionary for the specified keyspace
and table,
+ * or null if no dictionary exists or if an error occurs during
retrieval
+ */
+ @Nullable
+ public static LightweightCompressionDictionary
retrieveLightweightLatestCompressionDictionary(String keyspaceName, String
tableName)
+ {
+ String query = "SELECT keyspace_name, table_name, kind, dict_id,
dict_checksum, dict_length FROM %s.%s WHERE keyspace_name='%s' AND
table_name='%s' LIMIT 1";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName);
+ try
+ {
+ return
CompressionDictionary.createFromRowLightweight(QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one());
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Retrieves a specific compression dictionary for a given keyspace and
table
+ * backed by {@link LightweightCompressionDictionary} object.
*
* @param keyspaceName the keyspace name to retrieve the dictionary for
* @param tableName the table name to retrieve the dictionary for
@@ -466,14 +491,47 @@ public final class SystemDistributedKeyspace
* @return the compression dictionary identified by the specified
keyspace, table and dictionaryId,
* or null if no dictionary exists or if an error occurs during
retrieval
*/
- public static CompressionDictionary retrieveCompressionDictionary(String
keyspaceName, String tableName, CompressionDictionary.DictId dictionaryId)
+ @Nullable
+ public static CompressionDictionary retrieveCompressionDictionary(String
keyspaceName, String tableName, long dictionaryId)
{
String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND dict_id=%s";
- String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, dictionaryId.id);
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, dictionaryId);
try
{
- UntypedResultSet.Row row = QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one();
- return CompressionDictionary.createFromRow(row);
+ return
CompressionDictionary.createFromRow(QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one());
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Retrieves all dictionaries for a given keyspace and table.
+ *
+ * @param keyspaceName the keyspace name to retrieve the dictionary for
+ * @param tableName the table name to retrieve the dictionary for
+ * @return the compression dictionaries identified by the specified
keyspace and table,
+ * or null if no dictionary exists or if an error occurs during
retrieval
+ */
+ @Nullable
+ public static List<LightweightCompressionDictionary>
retrieveLightweightCompressionDictionaries(String keyspaceName, String
tableName)
+ {
+ String query = "SELECT keyspace_name, table_name, kind, dict_id,
dict_length, dict_checksum FROM %s.%s WHERE keyspace_name='%s' AND
table_name='%s'";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName);
+ try
+ {
+ UntypedResultSet result = QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE);
+ if (result.isEmpty())
+ return Collections.emptyList();
+ List<LightweightCompressionDictionary> dictionaries = new
ArrayList<>();
+ Iterator<UntypedResultSet.Row> iterator = result.iterator();
+ while (iterator.hasNext())
+ {
+
dictionaries.add(CompressionDictionary.createFromRowLightweight(iterator.next()));
+ }
+
+ return dictionaries;
}
catch (Exception e)
{
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 3430443423..662716f5e9 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -89,6 +89,7 @@ import org.apache.cassandra.batchlog.BatchlogManagerMBean;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData;
import org.apache.cassandra.db.compression.CompressionDictionaryManagerMBean;
import org.apache.cassandra.db.compression.TrainingState;
import org.apache.cassandra.db.guardrails.Guardrails;
@@ -2698,10 +2699,78 @@ public class NodeProbe implements AutoCloseable
*/
public void trainCompressionDictionary(String keyspace, String table,
boolean force) throws IOException
{
- CompressionDictionaryManagerMBean proxy =
getDictionaryManagerProxy(keyspace, table);
+ doWithCompressionDictionaryManagerMBean(proxy -> { proxy.train(force);
return null; }, keyspace, table);
+ }
+
+ /**
+ * Returns latest dictionary for given keyspace and table.
+ *
+ * @param keyspace the keyspace name
+ * @param table the table name
+ * @return the latest dictionary for given keyspace and table
+ * @throws IOException if there's an error accessing the MBean
+ * @throws IllegalArgumentException if table doesn't support dictionary
compression
+ */
+ public CompositeData getCompressionDictionary(String keyspace, String
table) throws IOException
+ {
+ return
doWithCompressionDictionaryManagerMBean(CompressionDictionaryManagerMBean::getCompressionDictionary,
keyspace, table);
+ }
+
+ /**
+ * Returns the dictionary for given keyspace and table and dictionary id.
+ *
+ * @param keyspace the keyspace name
+ * @param table the table name
+ * @param dictId id of dictionary to get
+ * @return the dictionary for given keyspace and table and dictionary id.
+ * @throws IOException if there's an error accessing the MBean
+ * @throws IllegalArgumentException if table doesn't support dictionary
compression
+ */
+ public CompositeData getCompressionDictionary(String keyspace, String
table, long dictId) throws IOException
+ {
+ return doWithCompressionDictionaryManagerMBean(proxy ->
proxy.getCompressionDictionary(dictId), keyspace, table);
+ }
+
+ /**
+ * Imports dictionary in composite data to database.
+ *
+ * @param compositeData data to import
+ * @throws IOException if there's an error accessing the MBean
+ * @throws IllegalArgumentException if keyspace and table values in
compositeData are missing
+ * or if table doesn't support dictionary compression
+ */
+ public void importCompressionDictionary(CompositeData compositeData)
throws IOException
+ {
+ String keyspace = (String)
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME);
+ String table = (String)
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME);
+
+ if (keyspace == null || table == null)
+ {
+ throw new IllegalArgumentException("Argument must have keyspace
and table values.");
+ }
+
+ doWithCompressionDictionaryManagerMBean(proxy -> {
proxy.importCompressionDictionary(compositeData); return null; }, keyspace,
table);
+ }
+
+ /**
+ *
+ * @param keyspace keyspace to list dictionaries for
+ * @param table table to list dictionaries for
+ * @return tabular data with listing
+ * @throws IOException if there's an error accessing the MBean
+ * @throws IllegalArgumentException if table doesn't support dictionary
compression
+ */
+ public TabularData listCompressionDictionaries(String keyspace, String
table) throws IOException
+ {
+ return
doWithCompressionDictionaryManagerMBean(CompressionDictionaryManagerMBean::listCompressionDictionaries,
keyspace, table);
+ }
+
+ private <T> T
doWithCompressionDictionaryManagerMBean(Function<CompressionDictionaryManagerMBean,
T> func,
+ String keyspace,
String table) throws IOException
+ {
try
{
- proxy.train(force);
+ return func.apply(getDictionaryManagerProxy(keyspace, table));
}
catch (Exception e)
{
@@ -2709,7 +2778,7 @@ public class NodeProbe implements AutoCloseable
{
String message = String.format("Table %s.%s does not exist or
does not support dictionary compression",
keyspace, table);
- throw new IOException(message);
+ throw new IllegalArgumentException(message);
}
else
{
diff --git
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionary.java
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionary.java
deleted file mode 100644
index 5e05bfdfa8..0000000000
--- a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionary.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.tools.nodetool;
-
-import java.io.PrintStream;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
-import org.apache.cassandra.db.compression.TrainingState;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.utils.Clock;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.Option;
-import picocli.CommandLine.Parameters;
-
-@Command(name = "compressiondictionary",
- description = "Manage compression dictionaries",
- subcommands = { CompressionDictionary.Train.class })
-public class CompressionDictionary
-{
- @Command(name = "train",
- description = "Manually trigger compression dictionary training
for a table. If no SSTables are available, the memtable will be flushed first.")
- public static class Train extends AbstractCommand
- {
- @Parameters(index = "0", description = "The keyspace name", arity =
"1")
- private String keyspace;
-
- @Parameters(index = "1", description = "The table name", arity = "1")
- private String table;
-
- @Option(names = { "-f", "--force" }, description = "Force the
dictionary training even if there are not enough samples")
- private boolean force = false;
-
- @Override
- public void execute(NodeProbe probe)
- {
- PrintStream out = probe.output().out;
- PrintStream err = probe.output().err;
-
- try
- {
- out.printf("Starting compression dictionary training for
%s.%s...%n", keyspace, table);
- out.printf("Training from existing SSTables (flushing first if
needed)%n");
-
- probe.trainCompressionDictionary(keyspace, table, force);
-
- // Wait for training completion (10 minutes timeout for
SSTable-based training)
- out.println("Sampling from existing SSTables and training.");
- long maxWaitMillis = TimeUnit.MINUTES.toMillis(10);
- long startTime = Clock.Global.currentTimeMillis();
-
- while (Clock.Global.currentTimeMillis() - startTime <
maxWaitMillis)
- {
- TrainingState trainingState =
probe.getCompressionDictionaryTrainingState(keyspace, table);
- TrainingStatus status = trainingState.getStatus();
- displayProgress(trainingState, startTime, out, status);
- if (TrainingStatus.COMPLETED == status)
- {
- out.printf("%nTraining completed successfully for
%s.%s%n", keyspace, table);
- return;
- }
- else if (TrainingStatus.FAILED == status)
- {
- err.printf("%nTraining failed for %s.%s%n", keyspace,
table);
- try
- {
- String failureMessage =
trainingState.getFailureMessage();
- if (failureMessage != null &&
!failureMessage.isEmpty())
- {
- err.printf("Reason: %s%n", failureMessage);
- }
- }
- catch (Exception e)
- {
- // If we can't get the failure message, just
continue without it
- }
- System.exit(1);
- }
-
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
-
- err.printf("%nTraining did not complete within expected
timeframe (10 minutes).%n");
- System.exit(1);
- }
- catch (Exception e)
- {
- err.printf("Failed to trigger training: %s%n", e.getMessage());
- System.exit(1);
- }
- }
-
- private static void displayProgress(TrainingState trainingState, long
startTime, PrintStream out, TrainingStatus status)
- {
- // Display meaningful statistics
- long sampleCount = trainingState.getSampleCount();
- long totalSampleSize = trainingState.getTotalSampleSize();
- long elapsedSeconds = (Clock.Global.currentTimeMillis() -
startTime) / 1000;
- double sampleSizeMB = totalSampleSize / (1024.0 * 1024.0);
-
- out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed:
%ds",
- status, sampleCount, sampleSizeMB, elapsedSeconds);
- }
- }
-}
diff --git
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
new file mode 100644
index 0000000000..02901cd375
--- /dev/null
+++
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
+import
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
+import org.apache.cassandra.db.compression.TrainingState;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.JsonUtils;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Parameters;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.stream.Collectors.joining;
+
+@Command(name = "compressiondictionary",
+ description = "Manage compression dictionaries",
+ subcommands = {
CompressionDictionaryCommandGroup.TrainDictionary.class,
+
CompressionDictionaryCommandGroup.ListDictionaries.class,
+
CompressionDictionaryCommandGroup.ExportDictionary.class,
+
CompressionDictionaryCommandGroup.ImportDictionary.class })
+public class CompressionDictionaryCommandGroup
+{
+ @Command(name = "train",
+ description = "Manually trigger compression dictionary training
for a table. If no SSTables are available, the memtable will be flushed first.")
+ public static class TrainDictionary extends AbstractCommand
+ {
+ @Parameters(index = "0", description = "The keyspace name", arity =
"1")
+ private String keyspace;
+
+ @Parameters(index = "1", description = "The table name", arity = "1")
+ private String table;
+
+ @Option(names = { "-f", "--force" }, description = "Force the
dictionary training even if there are not enough samples")
+ private boolean force = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ PrintStream out = probe.output().out;
+ PrintStream err = probe.output().err;
+
+ try
+ {
+ out.printf("Starting compression dictionary training for
%s.%s...%n", keyspace, table);
+ out.printf("Training from existing SSTables (flushing first if
needed)%n");
+
+ probe.trainCompressionDictionary(keyspace, table, force);
+
+ // Wait for training completion (10 minutes timeout for
SSTable-based training)
+ out.println("Sampling from existing SSTables and training.");
+ long maxWaitMillis = TimeUnit.MINUTES.toMillis(10);
+ long startTime = Clock.Global.currentTimeMillis();
+
+ while (Clock.Global.currentTimeMillis() - startTime <
maxWaitMillis)
+ {
+ TrainingState trainingState =
probe.getCompressionDictionaryTrainingState(keyspace, table);
+ TrainingStatus status = trainingState.getStatus();
+ displayProgress(trainingState, startTime, out, status);
+ if (TrainingStatus.COMPLETED == status)
+ {
+ out.printf("%nTraining completed successfully for
%s.%s%n", keyspace, table);
+ return;
+ }
+ else if (TrainingStatus.FAILED == status)
+ {
+ err.printf("%nTraining failed for %s.%s%n", keyspace,
table);
+ try
+ {
+ String failureMessage =
trainingState.getFailureMessage();
+ if (failureMessage != null &&
!failureMessage.isEmpty())
+ {
+ err.printf("Reason: %s%n", failureMessage);
+ }
+ }
+ catch (Exception e)
+ {
+ // If we can't get the failure message, just
continue without it
+ }
+ System.exit(1);
+ }
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ err.printf("%nTraining did not complete within expected
timeframe (10 minutes).%n");
+ System.exit(1);
+ }
+ catch (Exception e)
+ {
+ err.printf("Failed to trigger training: %s%n", e.getMessage());
+ System.exit(1);
+ }
+ }
+
+ private static void displayProgress(TrainingState trainingState, long
startTime, PrintStream out, TrainingStatus status)
+ {
+ // Display meaningful statistics
+ long sampleCount = trainingState.getSampleCount();
+ long totalSampleSize = trainingState.getTotalSampleSize();
+ long elapsedSeconds = (Clock.Global.currentTimeMillis() -
startTime) / 1000;
+ double sampleSizeMB = totalSampleSize / (1024.0 * 1024.0);
+
+ out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed:
%ds",
+ status, sampleCount, sampleSizeMB, elapsedSeconds);
+ }
+ }
+
+ @Command(name = "list",
+ description = "List available dictionaries of specific keyspace
and table.")
+ public static class ListDictionaries extends AbstractCommand
+ {
+ @Parameters(index = "0", description = "The keyspace name", arity =
"1")
+ private String keyspace;
+
+ @Parameters(index = "1", description = "The table name", arity = "1")
+ private String table;
+
+ @Override
+ protected void execute(NodeProbe probe)
+ {
+ try
+ {
+ TableBuilder tableBuilder = new TableBuilder();
+ TabularData tabularData =
probe.listCompressionDictionaries(keyspace, table);
+ List<String> indexNames =
tabularData.getTabularType().getIndexNames();
+
+ List<String> columns = new ArrayList<>(indexNames);
+ // ignore raw dict
+
columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX);
+ tableBuilder.add(columns);
+
+ for (Object eachDict : tabularData.keySet())
+ {
+ final List<?> dictRow = (List<?>) eachDict;
+
+ List<String> rowValues = new ArrayList<>();
+
+ for (int i = 0; i < dictRow.size(); i++)
+ {
+ // ignore raw dict
+ if (i ==
CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX)
+ continue;
+
+ rowValues.add(dictRow.get(i).toString());
+ }
+ tableBuilder.add(rowValues);
+ }
+
+ tableBuilder.printTo(probe.output().out);
+ }
+ catch (Exception e)
+ {
+ probe.output().err.printf("Failed to list dictionaries: %s%n",
e.getMessage());
+ System.exit(1);
+ }
+ }
+ }
+
+ @Command(name = "export",
+ description = "Export dictionary from Cassandra to local file.")
+ public static class ExportDictionary extends AbstractCommand
+ {
+ @Parameters(index = "0", description = "The keyspace name", arity =
"1")
+ private String keyspace;
+
+ @Parameters(index = "1", description = "The table name", arity = "1")
+ private String table;
+
+ @Parameters(index = "2", description = "File name to save dictionary
to", arity = "1")
+ private String dictionaryPath;
+
+ @Option(paramLabel = "dictId", names = { "-i", "--id" },
+ description = "The dictionary id. When not specified, the current
dictionary is returned.")
+ private long dictId = -1;
+
+ @Override
+ protected void execute(NodeProbe probe)
+ {
+ if (dictId <= 0 && dictId != -1)
+ {
+ probe.output().err.printf("Dictionary id has to be strictly
positive number.%n");
+ System.exit(1);
+ }
+
+ try
+ {
+ CompositeData compressionDictionary;
+
+ if (dictId == -1)
+ {
+ compressionDictionary =
probe.getCompressionDictionary(keyspace, table);
+ }
+ else
+ {
+ compressionDictionary =
probe.getCompressionDictionary(keyspace, table, dictId);
+ }
+
+ if (compressionDictionary == null)
+ {
+ probe.output().err.printf("Dictionary%s does not exist for
%s.%s.%n",
+ dictId == -1 ? "" : " with id "
+ dictId,
+ keyspace, table);
+ System.exit(1);
+ }
+
+ CompressionDictionaryDataObject dataObject =
CompressionDictionaryDetailsTabularData.fromCompositeData(compressionDictionary);
+ String dictionary =
JsonUtils.writeAsPrettyJsonString(dataObject);
+ FileUtils.write(new File(dictionaryPath), List.of(dictionary),
CREATE, TRUNCATE_EXISTING, WRITE);
+ }
+ catch (Throwable e)
+ {
+ probe.output().err.printf("Failed to export dictionary: %s%n",
e.getMessage());
+ System.exit(1);
+ }
+ }
+ }
+
+ @Command(name = "import",
+ description = "Import local dictionary to Cassandra.")
+ public static class ImportDictionary extends AbstractCommand
+ {
+ @Parameters(index = "0", description = "The path to a dictionary
JSON", arity = "1")
+ private String dictionaryPath;
+
+ @Override
+ protected void execute(NodeProbe probe)
+ {
+ File dictionaryFile = new File(dictionaryPath);
+ validateFile(dictionaryFile, probe);
+
+ try
+ {
+ String jsonContent =
FileUtils.readLines(dictionaryFile).stream().collect(joining(lineSeparator()));
+ CompressionDictionaryDataObject dictionaryDataObject =
JsonUtils.JSON_OBJECT_MAPPER.readValue(jsonContent,
CompressionDictionaryDataObject.class);
+ CompositeData compositeData =
CompressionDictionaryDetailsTabularData.fromCompressionDictionaryDataObject(dictionaryDataObject);
+ probe.importCompressionDictionary(compositeData);
+ }
+ catch (ValueInstantiationException ex)
+ {
+ // we catch this when validation of data object fails - that
will happen when
+ // JSON is invalid, and we attempt to deserialize it to data
object by Jackson
+ // We can fail fast on the client, so we will never reach
Cassandra node with a payload
+ // which would fail there too, so we do not contact a node
unnecessarily.
+ probe.output().err.printf("Unable to import dictionary JSON:
%s%n", ex.getCause().getMessage());
+ System.exit(1);
+ }
+ catch (Throwable t)
+ {
+ probe.output().err.printf("Unable to import dictionary JSON:
%s%n", t.getMessage());
+ System.exit(1);
+ }
+ }
+
+ private void validateFile(File dictionaryFile, NodeProbe probe)
+ {
+ if (!dictionaryFile.exists())
+ {
+ probe.output().err.printf("Path %s does not exist.%n",
dictionaryPath);
+ System.exit(1);
+ }
+ if (!dictionaryFile.isFile())
+ {
+ probe.output().err.printf("Path %s is not a file.%n",
dictionaryPath);
+ System.exit(1);
+ }
+ if (!dictionaryFile.isReadable())
+ {
+ probe.output().err.printf("Path %s is not readable.%n",
dictionaryPath);
+ System.exit(1);
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
index 99f26c813c..98793e2748 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
@@ -207,7 +207,7 @@ import static
org.apache.cassandra.tools.nodetool.Help.printTopCommandUsage;
TableStats.class,
TopPartitions.class,
TpStats.class,
- CompressionDictionary.class,
+ CompressionDictionaryCommandGroup.class,
TruncateHints.class,
UpdateCIDRGroup.class,
UpgradeSSTable.class,
diff --git a/test/resources/nodetool/help/compressiondictionary
b/test/resources/nodetool/help/compressiondictionary
index b3aadae01f..5a65cae36d 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary
@@ -14,6 +14,24 @@ SYNOPSIS
[(-u <username> | --username <username>)]
compressiondictionary train
[(-f | --force)] [--] <keyspace> <table>
+ nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+ [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
+ [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
+ [(-u <username> | --username <username>)]
compressiondictionary list
+ [--] <keyspace> <table>
+
+ nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+ [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
+ [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
+ [(-u <username> | --username <username>)]
compressiondictionary export
+ [(-i <dictId> | --id <dictId>)] [--] <keyspace> <table>
<dictionaryPath>
+
+ nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+ [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
+ [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
+ [(-u <username> | --username <username>)]
compressiondictionary import
+ [--] <dictionaryPath>
+
OPTIONS
-h <host>, --host <host>
Node hostname or ip address
@@ -42,3 +60,12 @@ COMMANDS
With --force option, Force the dictionary training even if there
are not
enough samples
+ list
+ List available dictionaries of specific keyspace and table.
+ export
+ Export dictionary from Cassandra to local file.
+
+ With --id option, The dictionary id. When not specified, the
current
+ dictionary is returned.
+ import
+ Import local dictionary to Cassandra.
diff --git a/test/resources/nodetool/help/compressiondictionary
b/test/resources/nodetool/help/compressiondictionary$export
similarity index 50%
copy from test/resources/nodetool/help/compressiondictionary
copy to test/resources/nodetool/help/compressiondictionary$export
index b3aadae01f..fc2f24ef7c 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary$export
@@ -1,23 +1,22 @@
NAME
- nodetool compressiondictionary - Manage compression dictionaries
+ nodetool compressiondictionary export - Export dictionary from
+ Cassandra to local file.
SYNOPSIS
nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
[(-pp | --print-port)] [(-pw <password> | --password
<password>)]
[(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
- [(-u <username> | --username <username>)] compressiondictionary
- <command> [<args>]
-
- nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
- [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
- [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
- [(-u <username> | --username <username>)]
compressiondictionary train
- [(-f | --force)] [--] <keyspace> <table>
+ [(-u <username> | --username <username>)]
compressiondictionary export
+ [(-i <dictId> | --id <dictId>)] [--] <keyspace> <table>
<dictionaryPath>
OPTIONS
-h <host>, --host <host>
Node hostname or ip address
+ -i <dictId>, --id <dictId>
+ The dictionary id. When not specified, the current dictionary is
+ returned.
+
-p <port>, --port <port>
Remote jmx agent port number
@@ -33,12 +32,16 @@ OPTIONS
-u <username>, --username <username>
Remote jmx agent username
-COMMANDS
- With no arguments, Display help information
+ --
+ This option can be used to separate command-line options from the
+ list of argument, (useful when arguments might be mistaken for
+ command-line options
+
+ <keyspace>
+ The keyspace name
- train
- Manually trigger compression dictionary training for a table. If no
- SSTables are available, the memtable will be flushed first.
+ <table>
+ The table name
- With --force option, Force the dictionary training even if there
are not
- enough samples
+ <dictionaryPath>
+ File name to save dictionary to
diff --git a/test/resources/nodetool/help/compressiondictionary
b/test/resources/nodetool/help/compressiondictionary$import
similarity index 50%
copy from test/resources/nodetool/help/compressiondictionary
copy to test/resources/nodetool/help/compressiondictionary$import
index b3aadae01f..1bc2998986 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary$import
@@ -1,18 +1,13 @@
NAME
- nodetool compressiondictionary - Manage compression dictionaries
+ nodetool compressiondictionary import - Import local dictionary to
+ Cassandra.
SYNOPSIS
nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
[(-pp | --print-port)] [(-pw <password> | --password
<password>)]
[(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
- [(-u <username> | --username <username>)] compressiondictionary
- <command> [<args>]
-
- nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
- [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
- [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
- [(-u <username> | --username <username>)]
compressiondictionary train
- [(-f | --force)] [--] <keyspace> <table>
+ [(-u <username> | --username <username>)]
compressiondictionary import
+ [--] <dictionaryPath>
OPTIONS
-h <host>, --host <host>
@@ -33,12 +28,10 @@ OPTIONS
-u <username>, --username <username>
Remote jmx agent username
-COMMANDS
- With no arguments, Display help information
-
- train
- Manually trigger compression dictionary training for a table. If no
- SSTables are available, the memtable will be flushed first.
+ --
+ This option can be used to separate command-line options from the
+ list of argument, (useful when arguments might be mistaken for
+ command-line options
- With --force option, Force the dictionary training even if there
are not
- enough samples
+ <dictionaryPath>
+ The path to a dictionary JSON
diff --git a/test/resources/nodetool/help/compressiondictionary
b/test/resources/nodetool/help/compressiondictionary$list
similarity index 50%
copy from test/resources/nodetool/help/compressiondictionary
copy to test/resources/nodetool/help/compressiondictionary$list
index b3aadae01f..e9b37215e0 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary$list
@@ -1,18 +1,13 @@
NAME
- nodetool compressiondictionary - Manage compression dictionaries
+ nodetool compressiondictionary list - List available dictionaries of
+ specific keyspace and table.
SYNOPSIS
nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
[(-pp | --print-port)] [(-pw <password> | --password
<password>)]
[(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
- [(-u <username> | --username <username>)] compressiondictionary
- <command> [<args>]
-
- nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
- [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
- [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
- [(-u <username> | --username <username>)]
compressiondictionary train
- [(-f | --force)] [--] <keyspace> <table>
+ [(-u <username> | --username <username>)]
compressiondictionary list
+ [--] <keyspace> <table>
OPTIONS
-h <host>, --host <host>
@@ -33,12 +28,13 @@ OPTIONS
-u <username>, --username <username>
Remote jmx agent username
-COMMANDS
- With no arguments, Display help information
+ --
+ This option can be used to separate command-line options from the
+ list of argument, (useful when arguments might be mistaken for
+ command-line options
- train
- Manually trigger compression dictionary training for a table. If no
- SSTables are available, the memtable will be flushed first.
+ <keyspace>
+ The keyspace name
- With --force option, Force the dictionary training even if there
are not
- enough samples
+ <table>
+ The table name
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
new file mode 100644
index 0000000000..99ac4c916b
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.util.function.Consumer;
+import javax.management.openmbean.CompositeData;
+
+import org.junit.Test;
+
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CompressionDictionaryHelper;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class CompressionDictionaryDataObjectTest
+{
+ private static final String KEYSPACE = "ks";
+ private static final String TABLE = "tb";
+ private static final CompressionDictionary COMPRESSION_DICTIONARY =
CompressionDictionaryHelper.INSTANCE.trainDictionary(KEYSPACE, TABLE);
+ private static final CompressionDictionaryDataObject VALID_OBJECT =
createValidObject();
+
+ @Test
+ public void
testConversionOfCompressionDictionaryDataObjectToCompositeDataAndBack()
+ {
+ CompositeData compositeData =
CompressionDictionaryDetailsTabularData.fromCompressionDictionaryDataObject(VALID_OBJECT);
+ CompressionDictionaryDataObject dataObject =
CompressionDictionaryDetailsTabularData.fromCompositeData(compositeData);
+
+ assertEquals(VALID_OBJECT.keyspace, dataObject.keyspace);
+ assertEquals(VALID_OBJECT.table, dataObject.table);
+ assertEquals(VALID_OBJECT.dictId, dataObject.dictId);
+ assertArrayEquals(VALID_OBJECT.dict, dataObject.dict);
+ assertEquals(VALID_OBJECT.kind, dataObject.kind);
+ assertEquals(VALID_OBJECT.dictChecksum, dataObject.dictChecksum);
+ assertEquals(VALID_OBJECT.dictLength, dataObject.dictLength);
+ }
+
+ @Test
+ public void testConversionOfCompressionDictionaryToDataObject()
+ {
+ CompositeData compositeData =
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(KEYSPACE,
TABLE, COMPRESSION_DICTIONARY);
+ CompressionDictionaryDataObject dataObject =
CompressionDictionaryDetailsTabularData.fromCompositeData(compositeData);
+
+ assertEquals(KEYSPACE, dataObject.keyspace);
+ assertEquals(TABLE, dataObject.table);
+ assertEquals(COMPRESSION_DICTIONARY.dictId().id, dataObject.dictId);
+ assertArrayEquals(COMPRESSION_DICTIONARY.rawDictionary(),
dataObject.dict);
+ assertEquals(COMPRESSION_DICTIONARY.kind().name(), dataObject.kind);
+ assertEquals(COMPRESSION_DICTIONARY.checksum(),
dataObject.dictChecksum);
+ assertEquals(COMPRESSION_DICTIONARY.rawDictionary().length,
dataObject.dictLength);
+ }
+
+ @Test
+ public void testConversionOfLightweightDictionaryToCompositeData()
+ {
+ LightweightCompressionDictionary lightweight = new
LightweightCompressionDictionary(KEYSPACE,
+
TABLE,
+
COMPRESSION_DICTIONARY.dictId(),
+
COMPRESSION_DICTIONARY.checksum(),
+
COMPRESSION_DICTIONARY.rawDictionary().length);
+
+ CompositeData compositeData =
CompressionDictionaryDetailsTabularData.fromLightweightCompressionDictionary(lightweight);
+
+ assertEquals(KEYSPACE,
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME));
+ assertEquals(TABLE,
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME));
+ assertEquals(COMPRESSION_DICTIONARY.dictId().id,
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME));
+
assertNull(compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME));
+ assertEquals(COMPRESSION_DICTIONARY.dictId().kind.name(),
compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME));
+ assertEquals(COMPRESSION_DICTIONARY.checksum(),
compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME));
+ assertEquals(COMPRESSION_DICTIONARY.rawDictionary().length,
compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME));
+ }
+
+ @Test
+ public void testValidation()
+ {
+ assertInvalid(modifier -> modifier.withKeyspace(null), "Keyspace not
specified.");
+ assertInvalid(modifier -> modifier.withTable(null), "Table not
specified.");
+ assertInvalid(modifier -> modifier.withDictId(-1), "Provided
dictionary id must be positive but it is '-1'.");
+ assertInvalid(modifier -> modifier.withDict(null), "Provided
dictionary byte array is null or empty.");
+ assertInvalid(modifier -> modifier.withDict(new byte[0]), "Provided
dictionary byte array is null or empty.");
+ assertInvalid(modifier -> modifier.withDict(new byte[((int)
FileUtils.ONE_MIB) + 1]),
+ "Imported dictionary can not be larger than 1048576
bytes, but it is 1048577 bytes.");
+ assertInvalid(modifier -> modifier.withKind(null), "Provided kind is
null.");
+ assertInvalid(modifier -> modifier.withKind("NONSENSE"), "There is no
such dictionary kind like 'NONSENSE'. Available kinds: [ZSTD]");
+ assertInvalid(modifier -> modifier.withDictLength(0), "Size has to be
strictly positive number, it is '0'.");
+ assertInvalid(modifier -> modifier.withDictLength(-10), "Size has to
be strictly positive number, it is '-10'.");
+ assertInvalid(modifier -> modifier.withDictLength(5),
+ "The length of the provided dictionary array (" +
VALID_OBJECT.dictLength +
+ ") is not equal to provided length value (5).");
+ assertInvalid(builder ->
builder.withDictChecksum(VALID_OBJECT.dictChecksum + 1),
+ "Computed checksum of dictionary to import (" +
VALID_OBJECT.dictChecksum +
+ ") is different from checksum specified on input (" +
(VALID_OBJECT.dictChecksum + 1) + ").");
+ }
+
+ private void assertInvalid(Consumer<DataObjectModifier> action, String
expectedExceptionMessage)
+ {
+ DataObjectModifier builder = new DataObjectModifier(VALID_OBJECT);
+ action.accept(builder);
+
+ assertThatThrownBy(builder::build)
+ .hasMessageContaining(expectedExceptionMessage)
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ private static CompressionDictionaryDataObject createValidObject()
+ {
+ return new CompressionDictionaryDataObject("ks",
+ "tb",
+ 123,
+
COMPRESSION_DICTIONARY.rawDictionary(),
+
CompressionDictionary.Kind.ZSTD.name(),
+
CompressionDictionary.calculateChecksum((byte)
CompressionDictionary.Kind.ZSTD.ordinal(),
+
123,
+
COMPRESSION_DICTIONARY.rawDictionary()),
+
COMPRESSION_DICTIONARY.rawDictionary().length);
+ }
+
+ private static class DataObjectModifier
+ {
+ private String keyspace;
+ private String table;
+ private long dictId;
+ private byte[] dict;
+ private String kind;
+ private int dictChecksum;
+ private int dictLength;
+
+ public DataObjectModifier(CompressionDictionaryDataObject from)
+ {
+ withKeyspace(from.keyspace);
+ withTable(from.table);
+ withDictId(from.dictId);
+ withDict(from.dict);
+ withKind(from.kind);
+ withDictChecksum(from.dictChecksum);
+ withDictLength(from.dictLength);
+ }
+
+ public CompressionDictionaryDataObject build()
+ {
+ return new CompressionDictionaryDataObject(keyspace, table,
dictId, dict, kind, dictChecksum, dictLength);
+ }
+
+ public DataObjectModifier withKeyspace(String keyspace)
+ {
+ this.keyspace = keyspace;
+ return this;
+ }
+
+ public DataObjectModifier withTable(String table)
+ {
+ this.table = table;
+ return this;
+ }
+
+ public DataObjectModifier withDictId(long dictId)
+ {
+ this.dictId = dictId;
+ return this;
+ }
+
+ public DataObjectModifier withDict(byte[] dict)
+ {
+ this.dict = dict;
+ return this;
+ }
+
+ public DataObjectModifier withKind(String kind)
+ {
+ this.kind = kind;
+ return this;
+ }
+
+ public DataObjectModifier withDictChecksum(int dictChecksum)
+ {
+ this.dictChecksum = dictChecksum;
+ return this;
+ }
+
+ public DataObjectModifier withDictLength(int dictLength)
+ {
+ this.dictLength = dictLength;
+ return this;
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 19a9d4cae1..816bc2c2d9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -30,7 +30,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -61,9 +60,7 @@ import org.apache.cassandra.cql3.functions.types.UDTValue;
import org.apache.cassandra.cql3.functions.types.UserType;
import org.apache.cassandra.db.compression.CompressionDictionary;
import org.apache.cassandra.db.compression.CompressionDictionary.DictId;
-import org.apache.cassandra.db.compression.CompressionDictionaryTrainingConfig;
import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
-import org.apache.cassandra.db.compression.ZstdDictionaryTrainer;
import org.apache.cassandra.db.marshal.FloatType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -77,6 +74,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.format.bti.BtiFormat;
+import org.apache.cassandra.utils.CompressionDictionaryHelper;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.PathUtils;
import org.apache.cassandra.locator.RangesAtEndpoint;
@@ -1727,7 +1725,7 @@ public abstract class CQLSSTableWriterTest
+ " PRIMARY KEY (k)"
+ ") WITH compression = {'class':
'ZstdDictionaryCompressor'}";
- CompressionDictionary dictionary =
DictionaryHelper.trainDictionary(keyspace, table);
+ CompressionDictionary dictionary =
CompressionDictionaryHelper.INSTANCE.trainDictionary(keyspace, table);
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
@@ -1738,7 +1736,7 @@ public abstract class CQLSSTableWriterTest
for (int i = 0; i < 500; i++)
{
- writer.addRow(i, DictionaryHelper.INSTANCE.getRandomSample());
+ writer.addRow(i,
CompressionDictionaryHelper.INSTANCE.getRandomSample());
}
writer.close();
@@ -1759,49 +1757,6 @@ public abstract class CQLSSTableWriterTest
}
}
- /**
- * Simple generator of random data for Zstd compression dictionary and
dictionary trainer.
- */
- private static class DictionaryHelper
- {
- public static final DictionaryHelper INSTANCE = new DictionaryHelper();
- private static final Random random = new Random();
-
- private static final String[] dates = new String[]
{"2025-10-20","2025-10-19","2025-10-18","2025-10-17","2025-10-16"};
- private static final String[] times = new String[]
{"11:00:01","11:00:02","11:00:03","11:00:04","11:00:05"};
- private static final String[] levels = new String[] {"TRACE", "DEBUG",
"INFO", "WARN", "ERROR"};
- private static final String[] services = new String[]
{"com.example.UserService", "com.example.DatabasePool",
"com.example.PaymentService", "com.example.OrderService"};
-
- private String getRandomSample()
- {
- return dates[random.nextInt(dates.length)] + ' ' +
- times[random.nextInt(times.length)] + ' ' +
- levels[random.nextInt(levels.length)] + ' ' +
- services[random.nextInt(services.length)] + ' ' +
- UUID.randomUUID(); // message
- }
-
- private static CompressionDictionary trainDictionary(String keyspace,
String table)
- {
- CompressionDictionaryTrainingConfig config =
CompressionDictionaryTrainingConfig
- .builder()
-
.maxDictionarySize(65536)
-
.maxTotalSampleSize(1024 * 1024) // 1MB total
- .build();
-
- try (ZstdDictionaryTrainer trainer = new
ZstdDictionaryTrainer(keyspace, table, config, 3))
- {
- trainer.start(true);
- for (int i = 0; i < 25000; i++)
- {
-
trainer.addSample(UTF8Type.instance.fromString(DictionaryHelper.INSTANCE.getRandomSample()));
- }
-
- return trainer.trainDictionary(false);
- }
- }
- }
-
protected static void loadSSTables(File dataDir, final String ks, final
String tb) throws ExecutionException, InterruptedException
{
SSTableLoader loader = new SSTableLoader(dataDir, new
SSTableLoader.Client()
diff --git
a/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
index 991435727a..11b0764a1a 100644
---
a/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
+++
b/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
@@ -136,9 +136,9 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
// Retrieve specific dictionary by ID
CompressionDictionary dict1 =
SystemDistributedKeyspace.retrieveCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE, new DictId(Kind.ZSTD, 100L));
+ TEST_KEYSPACE, TEST_TABLE, 100L);
CompressionDictionary dict2 =
SystemDistributedKeyspace.retrieveCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE, new DictId(Kind.ZSTD, 200L));
+ TEST_KEYSPACE, TEST_TABLE, 200L);
assertThat(dict1)
.as("Should retrieve dictionary 1")
@@ -173,7 +173,7 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
// Try to retrieve specific dictionary that doesn't exist
CompressionDictionary nonExistentById =
SystemDistributedKeyspace.retrieveCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE, new DictId(Kind.ZSTD, 999L));
+ TEST_KEYSPACE, TEST_TABLE, 999L);
assertThat(nonExistentById)
.as("Should return null for non-existent dictionary ID")
diff --git
a/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
new file mode 100644
index 0000000000..bd806ef953
--- /dev/null
+++
b/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.nio.file.Files;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.cassandra.cql3.CQLTester;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tools.ToolRunner.ToolResult;
+import org.apache.cassandra.utils.JsonUtils;
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
+import static org.apache.cassandra.utils.JsonUtils.serializeToJsonFile;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ExportImportListCompressionDictionaryTest extends CQLTester
+{
+ @BeforeClass
+ public static void setup() throws Throwable
+ {
+ requireNetwork();
+ startJMXServer();
+ }
+
+ @Test
+ public void testExportImportListCompressionDictionary() throws Throwable
+ {
+ // create table, train dictionary for it, export it to json file
+ String firstTable = createTable("CREATE TABLE %s (id int PRIMARY KEY,
data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ Pair<CompressionDictionaryDataObject, File> pair =
trainAndExport(firstTable);
+
+ String secondTable = createTable("CREATE TABLE %s (id int PRIMARY KEY,
data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ rewriteTable(secondTable, pair);
+
+ importDictionary(pair.right);
+
+ list(pair.left, firstTable);
+ list(pair.left, secondTable);
+ }
+
+ @Test
+ public void testExportingSpecificDictionary() throws Throwable
+ {
+ String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ Pair<CompressionDictionaryDataObject, File> pair =
trainAndExport(table);
+ Pair<CompressionDictionaryDataObject, File> pair2 =
trainAndExport(table);
+
+ export(table, pair.left.dictId);
+ export(table, pair2.left.dictId);
+ }
+
+ @Test
+ public void testInvalidKeyspaceTable() throws Throwable
+ {
+ // create table, train dictionary for it, export it to json file
+ String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ Pair<CompressionDictionaryDataObject, File> pair =
trainAndExport(table);
+
+ // test non-existing keyspace / table
+ serializeToJsonFile(new CompressionDictionaryDataObject("abc",
+ "def",
+
pair.left.dictId,
+ pair.left.dict,
+ pair.left.kind,
+
pair.left.dictChecksum,
+
pair.left.dictLength), pair.right);
+
+ ToolResult result = invokeNodetool("compressiondictionary", "import",
pair.right.absolutePath());
+ assertTrue(result.getStderr().contains("Unable to import dictionary
JSON: Table abc.def does not exist or does not support dictionary
compression"));
+ }
+
+ @Test
+ public void testValidationOnClient() throws Throwable
+ {
+ // create table, train dictionary for it, export it to json file
+ String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ Pair<CompressionDictionaryDataObject, File> pair =
trainAndExport(table);
+
+ // remove table on purpose will trigger client-side validation
+ // and it fails to even reach Cassandra
+ JsonNode jsonNode =
JsonUtils.JSON_OBJECT_MAPPER.readTree(Files.readString(pair.right.toPath()));
+ ObjectNode node = (ObjectNode) jsonNode;
+ node.remove("table");
+
+ File jsonWithoutTable = FileUtils.createTempFile("zstd-dictionary-",
".dict");
+
JsonUtils.JSON_OBJECT_MAPPER.writeValue(jsonWithoutTable.toJavaIOFile(), node);
+
+ ToolResult result = invokeNodetool("compressiondictionary", "import",
jsonWithoutTable.absolutePath());
+ assertTrue(result.getStderr().contains("Unable to import dictionary
JSON: Table not specified."));
+ }
+
+ @Test
+ public void testNotImportingOlderThanLatest() throws Throwable
+ {
+ String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ Pair<CompressionDictionaryDataObject, File> pair =
trainAndExport(table);
+ Pair<CompressionDictionaryDataObject, File> pair2 =
trainAndExport(table);
+
+ String newTable = createTable("CREATE TABLE %s (id int PRIMARY KEY,
data text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ rewriteTable(newTable, pair);
+ rewriteTable(newTable, pair2);
+
+ // import newer first
+ ToolResult result = invokeNodetool("compressiondictionary", "import",
pair2.right.absolutePath());
+ result.assertOnCleanExit();
+ // older will not be possible to import
+ result = invokeNodetool("compressiondictionary", "import",
pair.right.absolutePath());
+ assertTrue(result.getStderr().contains(format("Unable to import
dictionary JSON: Dictionary to import has older dictionary id " +
+ "(%s) than the latest
compression dictionary (%s) " +
+ "for table %s.%s",
+ pair.left.dictId,
+ pair2.left.dictId,
+ keyspace(), newTable)));
+ }
+
+ @Test
+ public void testImportingIntoTableWithDisabledCompression() throws
Throwable
+ {
+ String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ Pair<CompressionDictionaryDataObject, File> pair =
trainAndExport(table);
+
+ alterTable("ALTER TABLE %s WITH compression = {'enabled': false}");
+
+ ToolResult result = invokeNodetool("compressiondictionary", "import",
pair.right.absolutePath());
+ Assert.assertEquals(1, result.getExitCode());
+ assertEquals(format("Unable to import dictionary JSON: Table %s.%s
does not exist or does not support dictionary compression\n",
+ keyspace(),
+ table),
+ result.getStderr());
+ }
+
+ private void importDictionary(File dictFile)
+ {
+ ToolResult result = invokeNodetool("compressiondictionary", "import",
dictFile.absolutePath());
+ result.assertOnCleanExit();
+ }
+
+ private void list(CompressionDictionaryDataObject dataObject, String table)
+ {
+ ToolResult result = invokeNodetool("compressiondictionary", "list",
keyspace(), table);
+ result.assertOnExitCode();
+ assertTrue(result.getStdout()
+ .contains(format("%s %s %s %s %s %s",
+ keyspace(), table, dataObject.dictId,
+ dataObject.kind,
dataObject.dictChecksum,
+ dataObject.dictLength)));
+ }
+
+ private void rewriteTable(String table,
Pair<CompressionDictionaryDataObject, File> pair) throws Throwable
+ {
+ serializeToJsonFile(new
CompressionDictionaryDataObject(pair.left.keyspace,
+ table,
+
pair.left.dictId,
+ pair.left.dict,
+ pair.left.kind,
+
pair.left.dictChecksum,
+
pair.left.dictLength),
+ pair.right);
+ }
+
+ private Pair<CompressionDictionaryDataObject, File> trainAndExport(String
table) throws Throwable
+ {
+ trainDictionary(table);
+ return export(table, null);
+ }
+
+ private Pair<CompressionDictionaryDataObject, File> export(String table,
Long id) throws Throwable
+ {
+ File dictionaryFile = FileUtils.createTempFile("zstd-dictionary-",
".dict");
+ ToolResult result;
+ if (id != null)
+ {
+ result = invokeNodetool("compressiondictionary", "export",
keyspace(), table, dictionaryFile.absolutePath(), "--id", id.toString());
+ result.assertOnCleanExit();
+ }
+ else
+ {
+ result = invokeNodetool("compressiondictionary", "export",
keyspace(), table, dictionaryFile.absolutePath());
+ result.assertOnCleanExit();
+ }
+ CompressionDictionaryDataObject dataObject =
JsonUtils.deserializeFromJsonFile(CompressionDictionaryDataObject.class,
dictionaryFile);
+
+ assertTrue(dictionaryFile.exists());
+ assertTrue(dictionaryFile.length() > 0);
+
+ return Pair.create(dataObject, dictionaryFile);
+ }
+
+ private void trainDictionary(String table)
+ {
+ createSSTables();
+
+ // Test training command with --force since we have limited test data
+ ToolResult result = invokeNodetool("compressiondictionary", "train",
"--force", keyspace(), table);
+ result.assertOnCleanExit();
+
+ assertThat(result.getStdout())
+ .as("Should indicate training completed")
+ .contains("Training completed successfully")
+ .contains(keyspace())
+ .contains(table);
+ }
+
+ private void createSSTables()
+ {
+ for (int file = 0; file < 10; file++)
+ {
+ int batchSize = 1000;
+ for (int i = 0; i < batchSize; i++)
+ {
+ int index = i + file * batchSize;
+ execute("INSERT INTO %s (id, data) VALUES (?, ?)", index,
"test data " + index);
+ }
+
+ flush();
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
b/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
new file mode 100644
index 0000000000..d20c02f847
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.cassandra.db.compression.CompressionDictionary;
+import org.apache.cassandra.db.compression.CompressionDictionaryTrainingConfig;
+import org.apache.cassandra.db.compression.ZstdDictionaryTrainer;
+import org.apache.cassandra.db.marshal.UTF8Type;
+
+/**
+ * Simple generator of random data for Zstd compression dictionary and
dictionary trainer.
+ */
+public class CompressionDictionaryHelper
+{
+ public static final CompressionDictionaryHelper INSTANCE = new
CompressionDictionaryHelper();
+ private static final Random random = new Random();
+
+ private static final String[] dates = new String[]
{"2025-10-20","2025-10-19","2025-10-18","2025-10-17","2025-10-16"};
+ private static final String[] times = new String[]
{"11:00:01","11:00:02","11:00:03","11:00:04","11:00:05"};
+ private static final String[] levels = new String[] {"TRACE", "DEBUG",
"INFO", "WARN", "ERROR"};
+ private static final String[] services = new String[]
{"com.example.UserService", "com.example.DatabasePool",
"com.example.PaymentService", "com.example.OrderService"};
+
+ public String getRandomSample()
+ {
+ return dates[random.nextInt(dates.length)] + ' ' +
+ times[random.nextInt(times.length)] + ' ' +
+ levels[random.nextInt(levels.length)] + ' ' +
+ services[random.nextInt(services.length)] + ' ' +
+ UUID.randomUUID(); // message
+ }
+
+ public CompressionDictionary trainDictionary(String keyspace, String table)
+ {
+ CompressionDictionaryTrainingConfig config =
CompressionDictionaryTrainingConfig
+ .builder()
+ .maxDictionarySize(65536)
+ .maxTotalSampleSize(1024
* 1024) // 1MB total
+ .build();
+
+ try (ZstdDictionaryTrainer trainer = new
ZstdDictionaryTrainer(keyspace, table, config, 3))
+ {
+ trainer.start(true);
+ for (int i = 0; i < 25000; i++)
+ {
+
trainer.addSample(UTF8Type.instance.fromString(CompressionDictionaryHelper.INSTANCE.getRandomSample()));
+ }
+
+ return trainer.trainDictionary(false);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]