This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new bff083e CASSANDRA-19900: Make the compression cache configurable to reduce heap pressure for large SSTables (#77) bff083e is described below commit bff083e35ebc338daf93c1a3553c590ae1864115 Author: jberragan <jberra...@gmail.com> AuthorDate: Sun Sep 8 08:13:57 2024 -0700 CASSANDRA-19900: Make the compression cache configurable to reduce heap pressure for large SSTables (#77) Patch by James Berragan; Reviewed by Francisco Guerrero; Yifan Cai for CASSANDRA-19900 --- CHANGES.txt | 1 + .../apache/cassandra/spark/utils/Properties.java | 2 + .../java/org/apache/cassandra/clients/Sidecar.java | 38 +++++++-- .../cassandra/spark/data/CassandraDataLayer.java | 12 ++- .../spark/data/SidecarProvisionedSSTable.java | 8 +- .../cassandra/clients/SidecarClientConfigTest.java | 20 +++++ .../spark/data/SidecarProvisionedSSTableTest.java | 96 +++++++++++++++++++++- 7 files changed, 166 insertions(+), 11 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9c86723..e2916ba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Make the compression cache configurable to reduce heap pressure for large SSTables (CASSANDRA-19900) * Refactor TokenRangeMapping to use proper types instead of Strings (CASSANDRA-19901) * Removes checks for blocked instances from bulk-write path (CASSANDRA-19873) * Fix consistency level check for write (CASSANDRA-19842) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java index 06c0056..6597f85 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java @@ -40,6 +40,8 @@ public final class Properties public static final int DEFAULT_MAX_RETRIES = 10; public static final long DEFAULT_MILLIS_TO_SLEEP = 500; public static final int DEFAULT_MAX_POOL_SIZE = 64; + public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = true; + public static final long DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES = 8 * MEBI_BYTES; // 8MiB public static final long DEFAULT_MAX_BUFFER_SIZE = 6 * MEBI_BYTES; public static final long DEFAULT_CHUNK_BUFFER_SIZE = 4 * MEBI_BYTES; public static final Map<FileType, Long> DEFAULT_MAX_BUFFER_OVERRIDE = ImmutableMap.of( diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index e692ab2..6d558dd 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -58,10 +58,12 @@ import org.apache.cassandra.spark.validation.SslValidation; import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.cassandra.spark.validation.TrustStoreValidation; +import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CACHE_COMPRESSION_METADATA; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE; +import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES; @@ -210,6 +212,8 @@ public final class Sidecar public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = "chunkBufferSizeBytes"; public static final String MAX_POOL_SIZE_KEY = "maxPoolSize"; public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds"; + public static final String CACHE_COMPRESSION_METADATA_KEY = "cacheCompressionMetadata"; + public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = "maxSizeCacheCompressionMetadataBytes"; private final int userProvidedPort; private final int maxRetries; @@ -221,6 +225,8 @@ public final class Sidecar private final long chunkSize; private final Map<FileType, Long> maxBufferOverride; private final Map<FileType, Long> chunkBufferOverride; + private final boolean cacheCompressionMetadata; + private final long maxSizeCacheCompressionMetadataBytes; // CHECKSTYLE IGNORE: Constructor with many parameters private ClientConfig(int userProvidedPort, @@ -232,7 +238,9 @@ public final class Sidecar int maxPoolSize, int timeoutSeconds, Map<FileType, Long> maxBufferOverride, - Map<FileType, Long> chunkBufferOverride) + Map<FileType, Long> chunkBufferOverride, + boolean cacheCompressionMetadata, + long maxSizeCacheCompressionMetadataBytes) { this.userProvidedPort = userProvidedPort; this.maxRetries = maxRetries; @@ -244,6 +252,8 @@ public final class Sidecar this.timeoutSeconds = timeoutSeconds; this.maxBufferOverride = maxBufferOverride; this.chunkBufferOverride = chunkBufferOverride; + this.cacheCompressionMetadata = cacheCompressionMetadata; + this.maxSizeCacheCompressionMetadataBytes = maxSizeCacheCompressionMetadataBytes; } public int userProvidedPort() @@ -332,7 +342,9 @@ public final class Sidecar DEFAULT_MAX_POOL_SIZE, DEFAULT_TIMEOUT_SECONDS, DEFAULT_MAX_BUFFER_OVERRIDE, - DEFAULT_CHUNK_BUFFER_OVERRIDE); + DEFAULT_CHUNK_BUFFER_OVERRIDE, + DEFAULT_CACHE_COMPRESSION_METADATA, + DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES); } public static ClientConfig create(Map<String, String> options) @@ -347,7 +359,9 @@ public final class Sidecar MapUtils.getInt(options, MAX_POOL_SIZE_KEY, DEFAULT_MAX_POOL_SIZE), MapUtils.getInt(options, TIMEOUT_SECONDS_KEY, DEFAULT_TIMEOUT_SECONDS), buildMaxBufferOverride(options, DEFAULT_MAX_BUFFER_OVERRIDE), - buildChunkBufferOverride(options, DEFAULT_CHUNK_BUFFER_OVERRIDE) + buildChunkBufferOverride(options, DEFAULT_CHUNK_BUFFER_OVERRIDE), + MapUtils.getBoolean(options, CACHE_COMPRESSION_METADATA_KEY, DEFAULT_CACHE_COMPRESSION_METADATA), + MapUtils.getLong(options, MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY, DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES) ); } @@ -387,7 +401,9 @@ public final class Sidecar int maxPoolSize, int timeoutSeconds, Map<FileType, Long> maxBufferOverride, - Map<FileType, Long> chunkBufferOverride) + Map<FileType, Long> chunkBufferOverride, + boolean cacheCompressionMetadata, + long maxSizeCacheCompressionMetadataBytes) { return new ClientConfig(userProvidedPort, maxRetries, @@ -398,7 +414,19 @@ public final class Sidecar maxPoolSize, timeoutSeconds, maxBufferOverride, - chunkBufferOverride); + chunkBufferOverride, + cacheCompressionMetadata, + maxSizeCacheCompressionMetadataBytes); + } + + public long maxSizeCacheCompressionMetadataBytes() + { + return maxSizeCacheCompressionMetadataBytes; + } + + public boolean cacheCompressionMetadata() + { + return cacheCompressionMetadata; } } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 114b5dd..5e26deb 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -696,7 +696,9 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV in.readInt(), in.readInt(), (Map<FileType, Long>) in.readObject(), - (Map<FileType, Long>) in.readObject()); + (Map<FileType, Long>) in.readObject(), + in.readBoolean(), + in.readLong()); this.sslConfig = (SslConfig) in.readObject(); this.cqlTable = bridge.javaDeserialize(in, CqlTable.class); // Delegate (de-)serialization of version-specific objects to the Cassandra Bridge @@ -748,6 +750,8 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV out.writeInt(this.sidecarClientConfig.timeoutSeconds()); out.writeObject(this.sidecarClientConfig.maxBufferOverride()); out.writeObject(this.sidecarClientConfig.chunkBufferOverride()); + out.writeBoolean(this.sidecarClientConfig.cacheCompressionMetadata()); + out.writeLong(this.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes()); out.writeObject(this.sslConfig); bridge.javaSerialize(out, this.cqlTable); // Delegate (de-)serialization of version-specific objects to the Cassandra Bridge out.writeObject(this.tokenPartitioner); @@ -816,6 +820,8 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV out.writeInt(dataLayer.sidecarClientConfig.timeoutSeconds()); kryo.writeObject(out, dataLayer.sidecarClientConfig.maxBufferOverride()); kryo.writeObject(out, dataLayer.sidecarClientConfig.chunkBufferOverride()); + out.writeBoolean(dataLayer.sidecarClientConfig.cacheCompressionMetadata()); + out.writeLong(dataLayer.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes()); kryo.writeObjectOrNull(out, dataLayer.sslConfig, SslConfig.class); kryo.writeObject(out, dataLayer.cqlTable); kryo.writeObject(out, dataLayer.tokenPartitioner); @@ -865,7 +871,9 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV in.readInt(), in.readInt(), (Map<FileType, Long>) kryo.readObject(in, HashMap.class), - (Map<FileType, Long>) kryo.readObject(in, HashMap.class)), + (Map<FileType, Long>) kryo.readObject(in, HashMap.class), + in.readBoolean(), + in.readLong()), kryo.readObjectOrNull(in, SslConfig.class), kryo.readObject(in, CqlTable.class), kryo.readObject(in, TokenPartitioner.class), diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java index 418604f..d065302 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.commons.io.IOUtils; @@ -51,7 +52,8 @@ import org.jetbrains.annotations.Nullable; public class SidecarProvisionedSSTable extends SSTable { private static final long serialVersionUID = 6452703925812602832L; - private static final Cache<String, byte[]> COMPRESSION_CACHE = CacheBuilder.newBuilder() + @VisibleForTesting + public static final Cache<String, byte[]> COMPRESSION_CACHE = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.HOURS) .maximumSize(2048) .build(); @@ -151,7 +153,9 @@ public class SidecarProvisionedSSTable extends SSTable return null; } - if (fileType == FileType.COMPRESSION_INFO) + if (fileType == FileType.COMPRESSION_INFO + && sidecarClientConfig.cacheCompressionMetadata() + && snapshotFile.size < sidecarClientConfig.maxSizeCacheCompressionMetadataBytes()) { String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, snapshotFile.fileName); byte[] bytes; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java index bff83f3..2985c5f 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java @@ -26,6 +26,7 @@ import org.apache.cassandra.spark.bulkwriter.DataTransport; import static org.apache.cassandra.spark.bulkwriter.BulkSparkConf.DEFAULT_SIDECAR_PORT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -114,4 +115,23 @@ public class SidecarClientConfigTest userAgentStr = Sidecar.transportModeBasedWriterUserAgent(DataTransport.S3_COMPAT); assertTrue(userAgentStr.endsWith(" writer-s3")); } + + @Test + public void testCompressionCacheDefaults() + { + Sidecar.ClientConfig clientConfig = Sidecar.ClientConfig.create(ImmutableMap.of()); + assertTrue(clientConfig.cacheCompressionMetadata()); + assertEquals(8L * 1024L * 1024L, clientConfig.maxSizeCacheCompressionMetadataBytes()); + } + + @Test + public void testCompressionCache() + { + Sidecar.ClientConfig clientConfig = Sidecar.ClientConfig.create(ImmutableMap.of( + "cachecompressionmetadata", "false", + "maxsizecachecompressionmetadatabytes", "4194304") + ); + assertFalse(clientConfig.cacheCompressionMetadata()); + assertEquals(4L * 1024L * 1024L, clientConfig.maxSizeCacheCompressionMetadataBytes()); + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java index 4b5ec5e..edcdd4e 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java @@ -19,8 +19,12 @@ package org.apache.cassandra.spark.data; +import java.io.IOException; +import java.io.InputStream; import java.util.Collections; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -31,9 +35,13 @@ import org.apache.cassandra.clients.Sidecar; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.spark.stats.Stats; +import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.mock; /** @@ -142,12 +150,88 @@ class SidecarProvisionedSSTableTest .isThrownBy(() -> prepareTable("localhost", 9043, "ks", "tbl", "snap", dataFileName)); } + private static final byte[] ARRAY = new byte[]{'a', 'b', 'c'}; + + @Test + public void testCompressionCache() + { + SSTable ssTable = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); + String key = String.format("%s/%s/%s/%s/%s", "localhost1", "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); + SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY); + try (InputStream is = ssTable.openCompressionStream()) + { + assertArrayEquals(ARRAY, IOUtils.toByteArray(is)); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Test + public void testCompressionCacheDisabled() + { + SSTable ssTable = prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("cachecompressionmetadata", "false")), + "localhost1", 9043, + "keyspace1", "table1", + "snapshot1", "na-1-big-Data.db", + 524288); + String key = String.format("%s/%s/%s/%s/%s", "localhost1", "keyspace2", "table2", "snapshot2", "na-2-big-Data.db"); + SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY); + try (InputStream is = ssTable.openCompressionStream()) + { + assertNotNull(is); + // when not cached it should return a BufferingInputStream + // but in the tests it's backed by nothing so don't consume + assertInstanceOf(BufferingInputStream.class, is); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Test + public void testCompressionCacheTooLarge() + { + SSTable ssTable = prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("maxsizecachecompressionmetadatabytes", "4194304")), + "localhost1", 9043, + "keyspace1", "table1", + "snapshot1", "na-1-big-Data.db", + 5 * 1025 * 1024); + String key = String.format("%s/%s/%s/%s/%s", "localhost1", "keyspace2", "table2", "snapshot2", "na-2-big-Data.db"); + SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY); + try (InputStream is = ssTable.openCompressionStream()) + { + assertNotNull(is); + // when not cached it should return a BufferingInputStream + // but in the tests it's backed by nothing so don't consume + assertInstanceOf(BufferingInputStream.class, is); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + SSTable prepareTable(String sidecarHostName, int sidecarPort, String keyspace, String table, String snapshot, String dataFileName) + { + return prepareTable(this.sidecarClientConfig, sidecarHostName, sidecarPort, keyspace, table, snapshot, dataFileName, 5); + } + + SSTable prepareTable(Sidecar.ClientConfig clientConfig, + String sidecarHostName, + int sidecarPort, + String keyspace, + String table, + String snapshot, + String dataFileName, + int compressionFileSize) { ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(5, sidecarHostName, @@ -157,13 +241,21 @@ class SidecarProvisionedSSTableTest keyspace, table + "-abc1234", dataFileName); + ListSnapshotFilesResponse.FileInfo compressionInfo = new ListSnapshotFilesResponse.FileInfo(compressionFileSize, + sidecarHostName, + sidecarPort, + 1, + snapshot, + keyspace, + table + "-abc1234", + dataFileName); return new SidecarProvisionedSSTable(mockSidecarClient, - sidecarClientConfig, + clientConfig, new SidecarInstanceImpl(sidecarHostName, sidecarPort), keyspace, table, snapshot, - Collections.singletonMap(FileType.DATA, fileInfo), + ImmutableMap.of(FileType.DATA, fileInfo, FileType.COMPRESSION_INFO, compressionInfo), 1, Stats.DoNothingStats.INSTANCE); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org