This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bff083e  CASSANDRA-19900: Make the compression cache configurable to 
reduce heap pressure for large SSTables (#77)
bff083e is described below

commit bff083e35ebc338daf93c1a3553c590ae1864115
Author: jberragan <jberra...@gmail.com>
AuthorDate: Sun Sep 8 08:13:57 2024 -0700

    CASSANDRA-19900: Make the compression cache configurable to reduce heap 
pressure for large SSTables (#77)
    
    
    Patch by James Berragan; Reviewed by Francisco Guerrero; Yifan Cai for 
CASSANDRA-19900
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/spark/utils/Properties.java   |  2 +
 .../java/org/apache/cassandra/clients/Sidecar.java | 38 +++++++--
 .../cassandra/spark/data/CassandraDataLayer.java   | 12 ++-
 .../spark/data/SidecarProvisionedSSTable.java      |  8 +-
 .../cassandra/clients/SidecarClientConfigTest.java | 20 +++++
 .../spark/data/SidecarProvisionedSSTableTest.java  | 96 +++++++++++++++++++++-
 7 files changed, 166 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9c86723..e2916ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Make the compression cache configurable to reduce heap pressure for large 
SSTables (CASSANDRA-19900)
  * Refactor TokenRangeMapping to use proper types instead of Strings 
(CASSANDRA-19901)
  * Removes checks for blocked instances from bulk-write path (CASSANDRA-19873)
  * Fix consistency level check for write (CASSANDRA-19842)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
index 06c0056..6597f85 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
@@ -40,6 +40,8 @@ public final class Properties
     public static final int DEFAULT_MAX_RETRIES = 10;
     public static final long DEFAULT_MILLIS_TO_SLEEP = 500;
     public static final int DEFAULT_MAX_POOL_SIZE = 64;
+    public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = true;
+    public static final long DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES 
= 8 * MEBI_BYTES; // 8MiB
     public static final long DEFAULT_MAX_BUFFER_SIZE = 6 * MEBI_BYTES;
     public static final long DEFAULT_CHUNK_BUFFER_SIZE = 4 * MEBI_BYTES;
     public static final Map<FileType, Long> DEFAULT_MAX_BUFFER_OVERRIDE = 
ImmutableMap.of(
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index e692ab2..6d558dd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -58,10 +58,12 @@ import org.apache.cassandra.spark.validation.SslValidation;
 import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.cassandra.spark.validation.TrustStoreValidation;
 
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CACHE_COMPRESSION_METADATA;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE;
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE;
 import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES;
@@ -210,6 +212,8 @@ public final class Sidecar
         public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = 
"chunkBufferSizeBytes";
         public static final String MAX_POOL_SIZE_KEY = "maxPoolSize";
         public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds";
+        public static final String CACHE_COMPRESSION_METADATA_KEY = 
"cacheCompressionMetadata";
+        public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 
"maxSizeCacheCompressionMetadataBytes";
 
         private final int userProvidedPort;
         private final int maxRetries;
@@ -221,6 +225,8 @@ public final class Sidecar
         private final long chunkSize;
         private final Map<FileType, Long> maxBufferOverride;
         private final Map<FileType, Long> chunkBufferOverride;
+        private final boolean cacheCompressionMetadata;
+        private final long maxSizeCacheCompressionMetadataBytes;
 
         // CHECKSTYLE IGNORE: Constructor with many parameters
         private ClientConfig(int userProvidedPort,
@@ -232,7 +238,9 @@ public final class Sidecar
                              int maxPoolSize,
                              int timeoutSeconds,
                              Map<FileType, Long> maxBufferOverride,
-                             Map<FileType, Long> chunkBufferOverride)
+                             Map<FileType, Long> chunkBufferOverride,
+                             boolean cacheCompressionMetadata,
+                             long maxSizeCacheCompressionMetadataBytes)
         {
             this.userProvidedPort = userProvidedPort;
             this.maxRetries = maxRetries;
@@ -244,6 +252,8 @@ public final class Sidecar
             this.timeoutSeconds = timeoutSeconds;
             this.maxBufferOverride = maxBufferOverride;
             this.chunkBufferOverride = chunkBufferOverride;
+            this.cacheCompressionMetadata = cacheCompressionMetadata;
+            this.maxSizeCacheCompressionMetadataBytes = 
maxSizeCacheCompressionMetadataBytes;
         }
 
         public int userProvidedPort()
@@ -332,7 +342,9 @@ public final class Sidecar
                                        DEFAULT_MAX_POOL_SIZE,
                                        DEFAULT_TIMEOUT_SECONDS,
                                        DEFAULT_MAX_BUFFER_OVERRIDE,
-                                       DEFAULT_CHUNK_BUFFER_OVERRIDE);
+                                       DEFAULT_CHUNK_BUFFER_OVERRIDE,
+                                       DEFAULT_CACHE_COMPRESSION_METADATA,
+                                       
DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES);
         }
 
         public static ClientConfig create(Map<String, String> options)
@@ -347,7 +359,9 @@ public final class Sidecar
                           MapUtils.getInt(options, MAX_POOL_SIZE_KEY, 
DEFAULT_MAX_POOL_SIZE),
                           MapUtils.getInt(options, TIMEOUT_SECONDS_KEY, 
DEFAULT_TIMEOUT_SECONDS),
                           buildMaxBufferOverride(options, 
DEFAULT_MAX_BUFFER_OVERRIDE),
-                          buildChunkBufferOverride(options, 
DEFAULT_CHUNK_BUFFER_OVERRIDE)
+                          buildChunkBufferOverride(options, 
DEFAULT_CHUNK_BUFFER_OVERRIDE),
+                          MapUtils.getBoolean(options, 
CACHE_COMPRESSION_METADATA_KEY, DEFAULT_CACHE_COMPRESSION_METADATA),
+                          MapUtils.getLong(options, 
MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY, 
DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES)
             );
         }
 
@@ -387,7 +401,9 @@ public final class Sidecar
                                           int maxPoolSize,
                                           int timeoutSeconds,
                                           Map<FileType, Long> 
maxBufferOverride,
-                                          Map<FileType, Long> 
chunkBufferOverride)
+                                          Map<FileType, Long> 
chunkBufferOverride,
+                                          boolean cacheCompressionMetadata,
+                                          long 
maxSizeCacheCompressionMetadataBytes)
         {
             return new ClientConfig(userProvidedPort,
                                     maxRetries,
@@ -398,7 +414,19 @@ public final class Sidecar
                                     maxPoolSize,
                                     timeoutSeconds,
                                     maxBufferOverride,
-                                    chunkBufferOverride);
+                                    chunkBufferOverride,
+                                    cacheCompressionMetadata,
+                                    maxSizeCacheCompressionMetadataBytes);
+        }
+
+        public long maxSizeCacheCompressionMetadataBytes()
+        {
+            return maxSizeCacheCompressionMetadataBytes;
+        }
+
+        public boolean cacheCompressionMetadata()
+        {
+            return cacheCompressionMetadata;
         }
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 114b5dd..5e26deb 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -696,7 +696,9 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                                                in.readInt(),
                                                                in.readInt(),
                                                                (Map<FileType, 
Long>) in.readObject(),
-                                                               (Map<FileType, 
Long>) in.readObject());
+                                                               (Map<FileType, 
Long>) in.readObject(),
+                                                               
in.readBoolean(),
+                                                               in.readLong());
         this.sslConfig = (SslConfig) in.readObject();
 
         this.cqlTable = bridge.javaDeserialize(in, CqlTable.class);  // 
Delegate (de-)serialization of version-specific objects to the Cassandra Bridge
@@ -748,6 +750,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         out.writeInt(this.sidecarClientConfig.timeoutSeconds());
         out.writeObject(this.sidecarClientConfig.maxBufferOverride());
         out.writeObject(this.sidecarClientConfig.chunkBufferOverride());
+        out.writeBoolean(this.sidecarClientConfig.cacheCompressionMetadata());
+        
out.writeLong(this.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes());
         out.writeObject(this.sslConfig);
         bridge.javaSerialize(out, this.cqlTable);  // Delegate 
(de-)serialization of version-specific objects to the Cassandra Bridge
         out.writeObject(this.tokenPartitioner);
@@ -816,6 +820,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             out.writeInt(dataLayer.sidecarClientConfig.timeoutSeconds());
             kryo.writeObject(out, 
dataLayer.sidecarClientConfig.maxBufferOverride());
             kryo.writeObject(out, 
dataLayer.sidecarClientConfig.chunkBufferOverride());
+            
out.writeBoolean(dataLayer.sidecarClientConfig.cacheCompressionMetadata());
+            
out.writeLong(dataLayer.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes());
             kryo.writeObjectOrNull(out, dataLayer.sslConfig, SslConfig.class);
             kryo.writeObject(out, dataLayer.cqlTable);
             kryo.writeObject(out, dataLayer.tokenPartitioner);
@@ -865,7 +871,9 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                         in.readInt(),
                                         in.readInt(),
                                         (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class),
-                                        (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class)),
+                                        (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class),
+                                        in.readBoolean(),
+                                        in.readLong()),
             kryo.readObjectOrNull(in, SslConfig.class),
             kryo.readObject(in, CqlTable.class),
             kryo.readObject(in, TokenPartitioner.class),
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index 418604f..d065302 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.apache.commons.io.IOUtils;
@@ -51,7 +52,8 @@ import org.jetbrains.annotations.Nullable;
 public class SidecarProvisionedSSTable extends SSTable
 {
     private static final long serialVersionUID = 6452703925812602832L;
-    private static final Cache<String, byte[]> COMPRESSION_CACHE = 
CacheBuilder.newBuilder()
+    @VisibleForTesting
+    public static final Cache<String, byte[]> COMPRESSION_CACHE = 
CacheBuilder.newBuilder()
                                                                                
.expireAfterAccess(1, TimeUnit.HOURS)
                                                                                
.maximumSize(2048)
                                                                                
.build();
@@ -151,7 +153,9 @@ public class SidecarProvisionedSSTable extends SSTable
             return null;
         }
 
-        if (fileType == FileType.COMPRESSION_INFO)
+        if (fileType == FileType.COMPRESSION_INFO
+            && sidecarClientConfig.cacheCompressionMetadata()
+            && snapshotFile.size < 
sidecarClientConfig.maxSizeCacheCompressionMetadataBytes())
         {
             String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), 
keyspace, table, snapshotName, snapshotFile.fileName);
             byte[] bytes;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
index bff83f3..2985c5f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.spark.bulkwriter.DataTransport;
 
 import static 
org.apache.cassandra.spark.bulkwriter.BulkSparkConf.DEFAULT_SIDECAR_PORT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -114,4 +115,23 @@ public class SidecarClientConfigTest
         userAgentStr = 
Sidecar.transportModeBasedWriterUserAgent(DataTransport.S3_COMPAT);
         assertTrue(userAgentStr.endsWith(" writer-s3"));
     }
+
+    @Test
+    public void testCompressionCacheDefaults()
+    {
+        Sidecar.ClientConfig clientConfig = 
Sidecar.ClientConfig.create(ImmutableMap.of());
+        assertTrue(clientConfig.cacheCompressionMetadata());
+        assertEquals(8L * 1024L * 1024L, 
clientConfig.maxSizeCacheCompressionMetadataBytes());
+    }
+
+    @Test
+    public void testCompressionCache()
+    {
+        Sidecar.ClientConfig clientConfig = 
Sidecar.ClientConfig.create(ImmutableMap.of(
+        "cachecompressionmetadata", "false",
+        "maxsizecachecompressionmetadatabytes", "4194304")
+        );
+        assertFalse(clientConfig.cacheCompressionMetadata());
+        assertEquals(4L * 1024L * 1024L, 
clientConfig.maxSizeCacheCompressionMetadataBytes());
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index 4b5ec5e..edcdd4e 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -19,8 +19,12 @@
 
 package org.apache.cassandra.spark.data;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collections;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -31,9 +35,13 @@ import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
 import org.apache.cassandra.spark.stats.Stats;
+import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -142,12 +150,88 @@ class SidecarProvisionedSSTableTest
         .isThrownBy(() -> prepareTable("localhost", 9043, "ks", "tbl", "snap", 
dataFileName));
     }
 
+    private static final byte[] ARRAY = new byte[]{'a', 'b', 'c'};
+
+    @Test
+    public void testCompressionCache()
+    {
+        SSTable ssTable = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-1-big-Data.db");
+        String key = String.format("%s/%s/%s/%s/%s", "localhost1", 
"keyspace1", "table1", "snapshot1", "na-1-big-Data.db");
+        SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY);
+        try (InputStream is = ssTable.openCompressionStream())
+        {
+            assertArrayEquals(ARRAY, IOUtils.toByteArray(is));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testCompressionCacheDisabled()
+    {
+        SSTable ssTable = 
prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("cachecompressionmetadata",
 "false")),
+                                       "localhost1", 9043,
+                                       "keyspace1", "table1",
+                                       "snapshot1", "na-1-big-Data.db",
+                                       524288);
+        String key = String.format("%s/%s/%s/%s/%s", "localhost1", 
"keyspace2", "table2", "snapshot2", "na-2-big-Data.db");
+        SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY);
+        try (InputStream is = ssTable.openCompressionStream())
+        {
+            assertNotNull(is);
+            // when not cached it should return a BufferingInputStream
+            // but in the tests it's backed by nothing so don't consume
+            assertInstanceOf(BufferingInputStream.class, is);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testCompressionCacheTooLarge()
+    {
+        SSTable ssTable = 
prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("maxsizecachecompressionmetadatabytes",
 "4194304")),
+                                       "localhost1", 9043,
+                                       "keyspace1", "table1",
+                                       "snapshot1", "na-1-big-Data.db",
+                                       5 * 1025 * 1024);
+        String key = String.format("%s/%s/%s/%s/%s", "localhost1", 
"keyspace2", "table2", "snapshot2", "na-2-big-Data.db");
+        SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY);
+        try (InputStream is = ssTable.openCompressionStream())
+        {
+            assertNotNull(is);
+            // when not cached it should return a BufferingInputStream
+            // but in the tests it's backed by nothing so don't consume
+            assertInstanceOf(BufferingInputStream.class, is);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     SSTable prepareTable(String sidecarHostName,
                          int sidecarPort,
                          String keyspace,
                          String table,
                          String snapshot,
                          String dataFileName)
+    {
+        return prepareTable(this.sidecarClientConfig, sidecarHostName, 
sidecarPort, keyspace, table, snapshot, dataFileName, 5);
+    }
+
+    SSTable prepareTable(Sidecar.ClientConfig clientConfig,
+                         String sidecarHostName,
+                         int sidecarPort,
+                         String keyspace,
+                         String table,
+                         String snapshot,
+                         String dataFileName,
+                         int compressionFileSize)
     {
         ListSnapshotFilesResponse.FileInfo fileInfo = new 
ListSnapshotFilesResponse.FileInfo(5,
                                                                                
              sidecarHostName,
@@ -157,13 +241,21 @@ class SidecarProvisionedSSTableTest
                                                                                
              keyspace,
                                                                                
              table + "-abc1234",
                                                                                
              dataFileName);
+        ListSnapshotFilesResponse.FileInfo compressionInfo = new 
ListSnapshotFilesResponse.FileInfo(compressionFileSize,
+                                                                               
                     sidecarHostName,
+                                                                               
                     sidecarPort,
+                                                                               
                     1,
+                                                                               
                     snapshot,
+                                                                               
                     keyspace,
+                                                                               
                     table + "-abc1234",
+                                                                               
                     dataFileName);
         return new SidecarProvisionedSSTable(mockSidecarClient,
-                                             sidecarClientConfig,
+                                             clientConfig,
                                              new 
SidecarInstanceImpl(sidecarHostName, sidecarPort),
                                              keyspace,
                                              table,
                                              snapshot,
-                                             
Collections.singletonMap(FileType.DATA, fileInfo),
+                                             ImmutableMap.of(FileType.DATA, 
fileInfo, FileType.COMPRESSION_INFO, compressionInfo),
                                              1,
                                              Stats.DoNothingStats.INSTANCE);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to