This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 972535d  CASSANDRA-19927: Remove old compression cache and move to 
using cache of CompressionMetadata (#84)
972535d is described below

commit 972535d0f7cd828b7e0e40706adbe8897a436a5d
Author: jberragan <jberra...@gmail.com>
AuthorDate: Tue Sep 17 15:52:45 2024 -0700

    CASSANDRA-19927: Remove old compression cache and move to using cache of 
CompressionMetadata (#84)
    
    Deprecate old compression cache and move to using cache of 
CompressionMetadata, so that:
    - we no longer cache an entire byte array on heap
    - we cache and re-use the CompressionMetadata object so that only one 
BigLongArray object is allocated for the chunk offsets
    
    Patch by James Berragan; Reviewed by Yifan Cai; Francisco Guerrero for 
CASSANDRA-19927
---
 .../apache/cassandra/spark/utils/Properties.java   |  2 -
 .../java/org/apache/cassandra/clients/Sidecar.java | 38 ++---------
 .../cassandra/spark/data/CassandraDataLayer.java   | 12 +---
 .../spark/data/SidecarProvisionedSSTable.java      | 31 ---------
 .../cassandra/clients/SidecarClientConfigTest.java | 20 ------
 .../spark/data/SidecarProvisionedSSTableTest.java  | 73 ----------------------
 .../spark/reader/CompressedRawInputStream.java     | 15 ++++-
 .../spark/reader/CompressionMetadata.java          |  2 +-
 .../apache/cassandra/spark/reader/IndexReader.java | 16 +----
 .../cassandra/spark/reader/SSTableCache.java       | 58 ++++++++++++++++-
 .../cassandra/spark/reader/SSTableReader.java      | 28 ++++-----
 .../cassandra/spark/reader/SSTableCacheTests.java  | 16 +++++
 12 files changed, 107 insertions(+), 204 deletions(-)

diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
index 6597f85..06c0056 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java
@@ -40,8 +40,6 @@ public final class Properties
     public static final int DEFAULT_MAX_RETRIES = 10;
     public static final long DEFAULT_MILLIS_TO_SLEEP = 500;
     public static final int DEFAULT_MAX_POOL_SIZE = 64;
-    public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = true;
-    public static final long DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES 
= 8 * MEBI_BYTES; // 8MiB
     public static final long DEFAULT_MAX_BUFFER_SIZE = 6 * MEBI_BYTES;
     public static final long DEFAULT_CHUNK_BUFFER_SIZE = 4 * MEBI_BYTES;
     public static final Map<FileType, Long> DEFAULT_MAX_BUFFER_OVERRIDE = 
ImmutableMap.of(
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 2f17040..fee8116 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -58,12 +58,10 @@ import org.apache.cassandra.spark.validation.SslValidation;
 import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.cassandra.spark.validation.TrustStoreValidation;
 
-import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CACHE_COMPRESSION_METADATA;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE;
-import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE;
 import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES;
@@ -212,8 +210,6 @@ public final class Sidecar
         public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = 
"chunkBufferSizeBytes";
         public static final String MAX_POOL_SIZE_KEY = "maxPoolSize";
         public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds";
-        public static final String CACHE_COMPRESSION_METADATA_KEY = 
"cacheCompressionMetadata";
-        public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 
"maxSizeCacheCompressionMetadataBytes";
 
         private final int userProvidedPort;
         private final int maxRetries;
@@ -225,8 +221,6 @@ public final class Sidecar
         private final long chunkSize;
         private final Map<FileType, Long> maxBufferOverride;
         private final Map<FileType, Long> chunkBufferOverride;
-        private final boolean cacheCompressionMetadata;
-        private final long maxSizeCacheCompressionMetadataBytes;
 
         // CHECKSTYLE IGNORE: Constructor with many parameters
         private ClientConfig(int userProvidedPort,
@@ -238,9 +232,7 @@ public final class Sidecar
                              int maxPoolSize,
                              int timeoutSeconds,
                              Map<FileType, Long> maxBufferOverride,
-                             Map<FileType, Long> chunkBufferOverride,
-                             boolean cacheCompressionMetadata,
-                             long maxSizeCacheCompressionMetadataBytes)
+                             Map<FileType, Long> chunkBufferOverride)
         {
             this.userProvidedPort = userProvidedPort;
             this.maxRetries = maxRetries;
@@ -252,8 +244,6 @@ public final class Sidecar
             this.timeoutSeconds = timeoutSeconds;
             this.maxBufferOverride = maxBufferOverride;
             this.chunkBufferOverride = chunkBufferOverride;
-            this.cacheCompressionMetadata = cacheCompressionMetadata;
-            this.maxSizeCacheCompressionMetadataBytes = 
maxSizeCacheCompressionMetadataBytes;
         }
 
         public int userProvidedPort()
@@ -342,9 +332,7 @@ public final class Sidecar
                                        DEFAULT_MAX_POOL_SIZE,
                                        DEFAULT_TIMEOUT_SECONDS,
                                        DEFAULT_MAX_BUFFER_OVERRIDE,
-                                       DEFAULT_CHUNK_BUFFER_OVERRIDE,
-                                       DEFAULT_CACHE_COMPRESSION_METADATA,
-                                       
DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES);
+                                       DEFAULT_CHUNK_BUFFER_OVERRIDE);
         }
 
         public static ClientConfig create(Map<String, String> options)
@@ -359,9 +347,7 @@ public final class Sidecar
                           MapUtils.getInt(options, MAX_POOL_SIZE_KEY, 
DEFAULT_MAX_POOL_SIZE),
                           MapUtils.getInt(options, TIMEOUT_SECONDS_KEY, 
DEFAULT_TIMEOUT_SECONDS),
                           buildMaxBufferOverride(options, 
DEFAULT_MAX_BUFFER_OVERRIDE),
-                          buildChunkBufferOverride(options, 
DEFAULT_CHUNK_BUFFER_OVERRIDE),
-                          MapUtils.getBoolean(options, 
CACHE_COMPRESSION_METADATA_KEY, DEFAULT_CACHE_COMPRESSION_METADATA),
-                          MapUtils.getLong(options, 
MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY, 
DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES)
+                          buildChunkBufferOverride(options, 
DEFAULT_CHUNK_BUFFER_OVERRIDE)
             );
         }
 
@@ -401,9 +387,7 @@ public final class Sidecar
                                           int maxPoolSize,
                                           int timeoutSeconds,
                                           Map<FileType, Long> 
maxBufferOverride,
-                                          Map<FileType, Long> 
chunkBufferOverride,
-                                          boolean cacheCompressionMetadata,
-                                          long 
maxSizeCacheCompressionMetadataBytes)
+                                          Map<FileType, Long> 
chunkBufferOverride)
         {
             return new ClientConfig(userProvidedPort,
                                     maxRetries,
@@ -414,19 +398,7 @@ public final class Sidecar
                                     maxPoolSize,
                                     timeoutSeconds,
                                     maxBufferOverride,
-                                    chunkBufferOverride,
-                                    cacheCompressionMetadata,
-                                    maxSizeCacheCompressionMetadataBytes);
-        }
-
-        public long maxSizeCacheCompressionMetadataBytes()
-        {
-            return maxSizeCacheCompressionMetadataBytes;
-        }
-
-        public boolean cacheCompressionMetadata()
-        {
-            return cacheCompressionMetadata;
+                                    chunkBufferOverride);
         }
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 369e58e..d997385 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -696,9 +696,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                                                in.readInt(),
                                                                in.readInt(),
                                                                (Map<FileType, 
Long>) in.readObject(),
-                                                               (Map<FileType, 
Long>) in.readObject(),
-                                                               
in.readBoolean(),
-                                                               in.readLong());
+                                                               (Map<FileType, 
Long>) in.readObject());
         this.sslConfig = (SslConfig) in.readObject();
 
         this.cqlTable = bridge.javaDeserialize(in, CqlTable.class);  // 
Delegate (de-)serialization of version-specific objects to the Cassandra Bridge
@@ -750,8 +748,6 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         out.writeInt(this.sidecarClientConfig.timeoutSeconds());
         out.writeObject(this.sidecarClientConfig.maxBufferOverride());
         out.writeObject(this.sidecarClientConfig.chunkBufferOverride());
-        out.writeBoolean(this.sidecarClientConfig.cacheCompressionMetadata());
-        
out.writeLong(this.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes());
         out.writeObject(this.sslConfig);
         bridge.javaSerialize(out, this.cqlTable);  // Delegate 
(de-)serialization of version-specific objects to the Cassandra Bridge
         out.writeObject(this.tokenPartitioner);
@@ -820,8 +816,6 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             out.writeInt(dataLayer.sidecarClientConfig.timeoutSeconds());
             kryo.writeObject(out, 
dataLayer.sidecarClientConfig.maxBufferOverride());
             kryo.writeObject(out, 
dataLayer.sidecarClientConfig.chunkBufferOverride());
-            
out.writeBoolean(dataLayer.sidecarClientConfig.cacheCompressionMetadata());
-            
out.writeLong(dataLayer.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes());
             kryo.writeObjectOrNull(out, dataLayer.sslConfig, SslConfig.class);
             kryo.writeObject(out, dataLayer.cqlTable);
             kryo.writeObject(out, dataLayer.tokenPartitioner);
@@ -871,9 +865,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                         in.readInt(),
                                         in.readInt(),
                                         (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class),
-                                        (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class),
-                                        in.readBoolean(),
-                                        in.readLong()),
+                                        (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class)),
             kryo.readObjectOrNull(in, SslConfig.class),
             kryo.readObject(in, CqlTable.class),
             kryo.readObject(in, TokenPartitioner.class),
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index d065302..70523cd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -19,18 +19,10 @@
 
 package org.apache.cassandra.spark.data;
 
-import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.commons.io.IOUtils;
 
 import o.a.c.sidecar.client.shaded.common.response.ListSnapshotFilesResponse;
 import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
@@ -39,7 +31,6 @@ import 
org.apache.cassandra.clients.SidecarStreamConsumerAdapter;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.spark.stats.Stats;
-import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
 import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
 import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
@@ -52,11 +43,6 @@ import org.jetbrains.annotations.Nullable;
 public class SidecarProvisionedSSTable extends SSTable
 {
     private static final long serialVersionUID = 6452703925812602832L;
-    @VisibleForTesting
-    public static final Cache<String, byte[]> COMPRESSION_CACHE = 
CacheBuilder.newBuilder()
-                                                                               
.expireAfterAccess(1, TimeUnit.HOURS)
-                                                                               
.maximumSize(2048)
-                                                                               
.build();
     private final SidecarClient sidecar;
     private final SidecarInstance instance;
     private final Sidecar.ClientConfig sidecarClientConfig;
@@ -153,23 +139,6 @@ public class SidecarProvisionedSSTable extends SSTable
             return null;
         }
 
-        if (fileType == FileType.COMPRESSION_INFO
-            && sidecarClientConfig.cacheCompressionMetadata()
-            && snapshotFile.size < 
sidecarClientConfig.maxSizeCacheCompressionMetadataBytes())
-        {
-            String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), 
keyspace, table, snapshotName, snapshotFile.fileName);
-            byte[] bytes;
-            try
-            {
-                bytes = COMPRESSION_CACHE.get(key, () -> 
IOUtils.toByteArray(open(snapshotFile, fileType)));
-            }
-            catch (ExecutionException exception)
-            {
-                throw new 
RuntimeException(ThrowableUtils.rootCause(exception));
-            }
-            return new ByteArrayInputStream(bytes);
-        }
-
         return open(snapshotFile, fileType);
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
index 2985c5f..bff83f3 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.spark.bulkwriter.DataTransport;
 
 import static 
org.apache.cassandra.spark.bulkwriter.BulkSparkConf.DEFAULT_SIDECAR_PORT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -115,23 +114,4 @@ public class SidecarClientConfigTest
         userAgentStr = 
Sidecar.transportModeBasedWriterUserAgent(DataTransport.S3_COMPAT);
         assertTrue(userAgentStr.endsWith(" writer-s3"));
     }
-
-    @Test
-    public void testCompressionCacheDefaults()
-    {
-        Sidecar.ClientConfig clientConfig = 
Sidecar.ClientConfig.create(ImmutableMap.of());
-        assertTrue(clientConfig.cacheCompressionMetadata());
-        assertEquals(8L * 1024L * 1024L, 
clientConfig.maxSizeCacheCompressionMetadataBytes());
-    }
-
-    @Test
-    public void testCompressionCache()
-    {
-        Sidecar.ClientConfig clientConfig = 
Sidecar.ClientConfig.create(ImmutableMap.of(
-        "cachecompressionmetadata", "false",
-        "maxsizecachecompressionmetadatabytes", "4194304")
-        );
-        assertFalse(clientConfig.cacheCompressionMetadata());
-        assertEquals(4L * 1024L * 1024L, 
clientConfig.maxSizeCacheCompressionMetadataBytes());
-    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
index edcdd4e..21d2085 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -19,12 +19,7 @@
 
 package org.apache.cassandra.spark.data;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-
 import com.google.common.collect.ImmutableMap;
-import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -35,13 +30,9 @@ import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
 import org.apache.cassandra.spark.stats.Stats;
-import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -150,70 +141,6 @@ class SidecarProvisionedSSTableTest
         .isThrownBy(() -> prepareTable("localhost", 9043, "ks", "tbl", "snap", 
dataFileName));
     }
 
-    private static final byte[] ARRAY = new byte[]{'a', 'b', 'c'};
-
-    @Test
-    public void testCompressionCache()
-    {
-        SSTable ssTable = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-1-big-Data.db");
-        String key = String.format("%s/%s/%s/%s/%s", "localhost1", 
"keyspace1", "table1", "snapshot1", "na-1-big-Data.db");
-        SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY);
-        try (InputStream is = ssTable.openCompressionStream())
-        {
-            assertArrayEquals(ARRAY, IOUtils.toByteArray(is));
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Test
-    public void testCompressionCacheDisabled()
-    {
-        SSTable ssTable = 
prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("cachecompressionmetadata",
 "false")),
-                                       "localhost1", 9043,
-                                       "keyspace1", "table1",
-                                       "snapshot1", "na-1-big-Data.db",
-                                       524288);
-        String key = String.format("%s/%s/%s/%s/%s", "localhost1", 
"keyspace2", "table2", "snapshot2", "na-2-big-Data.db");
-        SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY);
-        try (InputStream is = ssTable.openCompressionStream())
-        {
-            assertNotNull(is);
-            // when not cached it should return a BufferingInputStream
-            // but in the tests it's backed by nothing so don't consume
-            assertInstanceOf(BufferingInputStream.class, is);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Test
-    public void testCompressionCacheTooLarge()
-    {
-        SSTable ssTable = 
prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("maxsizecachecompressionmetadatabytes",
 "4194304")),
-                                       "localhost1", 9043,
-                                       "keyspace1", "table1",
-                                       "snapshot1", "na-1-big-Data.db",
-                                       5 * 1025 * 1024);
-        String key = String.format("%s/%s/%s/%s/%s", "localhost1", 
"keyspace2", "table2", "snapshot2", "na-2-big-Data.db");
-        SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY);
-        try (InputStream is = ssTable.openCompressionStream())
-        {
-            assertNotNull(is);
-            // when not cached it should return a BufferingInputStream
-            // but in the tests it's backed by nothing so don't consume
-            assertInstanceOf(BufferingInputStream.class, is);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     SSTable prepareTable(String sidecarHostName,
                          int sidecarPort,
                          String keyspace,
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java
index 03bf2d6..ca9f6da 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java
@@ -87,11 +87,22 @@ public final class CompressedRawInputStream extends 
RawInputStream
                                                     InputStream 
compressionInfoInputStream,
                                                     boolean 
hasCompressedLength,
                                                     Stats stats) throws 
IOException
+    {
+        return from(ssTable,
+                    dataInputStream,
+                    
CompressionMetadata.fromInputStream(compressionInfoInputStream,
+                                                        hasCompressedLength),
+                    stats);
+    }
+
+    static CompressedRawInputStream from(@Nullable SSTable ssTable,
+                                         DataInputStream dataInputStream,
+                                         CompressionMetadata 
compressionMetadata,
+                                         Stats stats)
     {
         return new CompressedRawInputStream(ssTable,
                                             dataInputStream,
-                                            
CompressionMetadata.fromInputStream(compressionInfoInputStream,
-                                                                               
 hasCompressedLength),
+                                            compressionMetadata,
                                             stats);
     }
 
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
index 1c89853..2140ea1 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.spark.reader.common.BigLongArray;
  */
 // CompressionMetadata is mocked in IndexReaderTests and mockito does not 
support mocking final classes
 // CHECKSTYLE IGNORE: FinalClass
-class CompressionMetadata extends AbstractCompressionMetadata
+public class CompressionMetadata extends AbstractCompressionMetadata
 {
 
     private final CompressionParams parameters;
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
index e112b5f..2fe4541 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
@@ -96,7 +96,7 @@ public class IndexReader implements IIndexReader
             }
 
             // read CompressionMetadata if it exists
-            CompressionMetadata compressionMetadata = 
getCompressionMetadata(ssTable, version.hasMaxCompressedLength());
+            CompressionMetadata compressionMetadata = 
SSTableCache.INSTANCE.compressionMetadata(ssTable, 
version.hasMaxCompressedLength());
             if (compressionMetadata != null)
             {
                 stats.indexCompressionFileRead(System.nanoTime() - now);
@@ -133,20 +133,6 @@ public class IndexReader implements IIndexReader
         }
     }
 
-    @Nullable
-    public static CompressionMetadata getCompressionMetadata(SSTable ssTable,
-                                                             boolean 
hasMaxCompressedLength) throws IOException
-    {
-        try (InputStream cis = ssTable.openCompressionStream())
-        {
-            if (cis != null)
-            {
-                return CompressionMetadata.fromInputStream(cis, 
hasMaxCompressedLength);
-            }
-        }
-        return null;
-    }
-
     @SuppressWarnings("InfiniteLoopStatement")
     static void consumePrimaryIndex(@NotNull IPartitioner partitioner,
                                     @NotNull InputStream primaryIndex,
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
index f13d4d5..6e9079e 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
@@ -20,10 +20,13 @@
 package org.apache.cassandra.spark.reader;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -35,11 +38,13 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.Pair;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Basic cache to reduce wasteful requests on the DataLayer for cacheable 
SSTable metadata,
@@ -59,15 +64,35 @@ public class SSTableCache
                                                                                
           propOrDefault("sbr.cache.stats.expireAfterMins", 60));
     private final Cache<SSTable, BloomFilter>                         filter = 
buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384),
                                                                                
           propOrDefault("sbr.cache.filter.expireAfterMins", 60));
+    // if compression is disabled then the CompressionInfo.db file will not 
exist
+    // therefore we can cache as Optional to a) avoid null errors in the cache 
and b) record that the component does not exist
+    private final Cache<SSTable, Optional<CompressionMetadata>>       
compressionMetadata = buildCache(
+    propOrDefault("sbr.cache.compressionInfo.maxEntries", 128),
+    propOrDefault("sbr.cache.compressionInfo.expireAfterMins", 15));
 
     private static int propOrDefault(String name, int defaultValue)
+    {
+        return propOrDefault(name, defaultValue, Integer::parseInt);
+    }
+
+    private static long propOrDefault(String name, long defaultValue)
+    {
+        return propOrDefault(name, defaultValue, Long::parseLong);
+    }
+
+    private static boolean propOrDefault(String name, boolean defaultValue)
+    {
+        return propOrDefault(name, defaultValue, Boolean::parseBoolean);
+    }
+
+    private static <T> T propOrDefault(String name, T defaultValue, 
Function<String, T> parser)
     {
         String str = System.getProperty(name);
         if (str != null)
         {
             try
             {
-                return Integer.parseInt(str);
+                return parser.apply(str);
             }
             catch (NumberFormatException exception)
             {
@@ -108,6 +133,32 @@ public class SSTableCache
         return get(filter, ssTable, () -> ReaderUtils.readFilter(ssTable, 
descriptor.version.hasOldBfFormat()));
     }
 
+    @Nullable
+    public CompressionMetadata compressionMetadata(@NotNull SSTable ssTable, 
boolean hasMaxCompressedLength) throws IOException
+    {
+        if (propOrDefault("sbr.cache.compressionInfo.enabled", true))
+        {
+            long maxSize = propOrDefault("sbr.cache.compressionInfo.maxSize", 
0L);
+            if (maxSize <= 0 || ssTable.length(FileType.COMPRESSION_INFO) < 
maxSize)
+            {
+                return get(compressionMetadata, ssTable, () -> 
readCompressionMetadata(ssTable, hasMaxCompressedLength)).orElse(null);
+            }
+        }
+        return readCompressionMetadata(ssTable, 
hasMaxCompressedLength).orElse(null);
+    }
+
+    private static Optional<CompressionMetadata> 
readCompressionMetadata(@NotNull SSTable ssTable, boolean 
hasMaxCompressedLength) throws IOException
+    {
+        try (InputStream cis = ssTable.openCompressionStream())
+        {
+            if (cis != null)
+            {
+                return Optional.of(CompressionMetadata.fromInputStream(cis, 
hasMaxCompressedLength));
+            }
+        }
+        return Optional.empty();
+    }
+
     boolean containsSummary(@NotNull SSTable ssTable)
     {
         return contains(summary, ssTable);
@@ -123,6 +174,11 @@ public class SSTableCache
         return contains(stats, ssTable);
     }
 
+    boolean containsCompressionMetadata(@NotNull SSTable ssTable)
+    {
+        return contains(compressionMetadata, ssTable);
+    }
+
     boolean containsFilter(@NotNull SSTable ssTable)
     {
         return contains(filter, ssTable);
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index a5bbc36..2b3c7c2 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -24,7 +24,6 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
-import java.io.InputStream;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -628,22 +627,19 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         SSTableStreamReader() throws IOException
         {
             lastToken = sparkRangeFilter != null ? 
sparkRangeFilter.tokenRange().upperEndpoint() : null;
-            try (@Nullable InputStream compressionInfoInputStream = 
ssTable.openCompressionStream())
-            {
-                DataInputStream dataInputStream = new 
DataInputStream(ssTable.openDataStream());
+            @Nullable CompressionMetadata compressionMetadata = 
SSTableCache.INSTANCE.compressionMetadata(ssTable, 
version.hasMaxCompressedLength());
+            DataInputStream dataInputStream = new 
DataInputStream(ssTable.openDataStream());
 
-                if (compressionInfoInputStream != null)
-                {
-                    dataStream = 
CompressedRawInputStream.fromInputStream(ssTable,
-                                                                          
dataInputStream,
-                                                                          
compressionInfoInputStream,
-                                                                          
version.hasMaxCompressedLength(),
-                                                                          
stats);
-                }
-                else
-                {
-                    dataStream = new RawInputStream(dataInputStream, new 
byte[64 * 1024], stats);
-                }
+            if (compressionMetadata != null)
+            {
+                dataStream = CompressedRawInputStream.from(ssTable,
+                                                           dataInputStream,
+                                                           compressionMetadata,
+                                                           stats);
+            }
+            else
+            {
+                dataStream = new RawInputStream(dataInputStream, new byte[64 * 
1024], stats);
             }
             dis = new DataInputStream(dataStream);
             if (startOffset != null)
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
index fc9db86..4ffe403 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
@@ -80,6 +80,7 @@ public class SSTableCacheTests
                     
assertFalse(SSTableCache.INSTANCE.containsSummary(ssTable0));
                     assertFalse(SSTableCache.INSTANCE.containsIndex(ssTable0));
                     assertFalse(SSTableCache.INSTANCE.containsStats(ssTable0));
+                    
assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0));
 
                     SummaryDbUtils.Summary key1 = 
SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0);
                     assertNotNull(key1);
@@ -87,6 +88,7 @@ public class SSTableCacheTests
                     assertFalse(SSTableCache.INSTANCE.containsIndex(ssTable0));
                     assertFalse(SSTableCache.INSTANCE.containsStats(ssTable0));
                     
assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable0));
+                    
assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0));
 
                     Pair<DecoratedKey, DecoratedKey> key2 = 
SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable0);
                     assertEquals(key1.first(), key2.left);
@@ -95,6 +97,7 @@ public class SSTableCacheTests
                     assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0));
                     assertFalse(SSTableCache.INSTANCE.containsStats(ssTable0));
                     
assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable0));
+                    
assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0));
 
                     Descriptor descriptor0 = Descriptor.fromFilename(
                             new File(String.format("./%s/%s", schema.keyspace, 
schema.table), dataFile0));
@@ -104,6 +107,7 @@ public class SSTableCacheTests
                     assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0));
                     assertTrue(SSTableCache.INSTANCE.containsStats(ssTable0));
                     
assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable0));
+                    
assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0));
                     assertEquals(componentMap, 
SSTableCache.INSTANCE.componentMapFromStats(ssTable0, descriptor0));
 
                     BloomFilter filter = 
SSTableCache.INSTANCE.bloomFilter(ssTable0, descriptor0);
@@ -111,9 +115,18 @@ public class SSTableCacheTests
                     assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0));
                     assertTrue(SSTableCache.INSTANCE.containsStats(ssTable0));
                     assertTrue(SSTableCache.INSTANCE.containsFilter(ssTable0));
+                    
assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0));
                     assertTrue(filter.isPresent(key1.first()));
                     assertTrue(filter.isPresent(key1.last()));
 
+                    CompressionMetadata compressionMetadata = 
SSTableCache.INSTANCE.compressionMetadata(ssTable0, 
descriptor0.version.hasMaxCompressedLength());
+                    assertNotNull(compressionMetadata);
+                    
assertTrue(SSTableCache.INSTANCE.containsSummary(ssTable0));
+                    assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0));
+                    assertTrue(SSTableCache.INSTANCE.containsStats(ssTable0));
+                    assertTrue(SSTableCache.INSTANCE.containsFilter(ssTable0));
+                    
assertTrue(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0));
+
                     SSTable ssTable1 = ssTables.get(1);
                     Descriptor descriptor1 = Descriptor.fromFilename(
                             new File(String.format("./%s/%s", schema.keyspace, 
schema.table), dataFile1));
@@ -121,6 +134,7 @@ public class SSTableCacheTests
                     assertFalse(SSTableCache.INSTANCE.containsIndex(ssTable1));
                     assertFalse(SSTableCache.INSTANCE.containsStats(ssTable1));
                     
assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable1));
+                    
assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1));
                     SummaryDbUtils.Summary key3 = 
SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1);
                     assertNotEquals(key1.first(), key3.first());
                     assertNotEquals(key1.last(), key3.last());
@@ -138,6 +152,8 @@ public class SSTableCacheTests
                     assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable1));
                     assertTrue(SSTableCache.INSTANCE.containsStats(ssTable1));
                     assertTrue(SSTableCache.INSTANCE.containsFilter(ssTable1));
+                    SSTableCache.INSTANCE.compressionMetadata(ssTable1, 
descriptor1.version.hasMaxCompressedLength());
+                    
assertTrue(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1));
                 }
                 catch (IOException exception)
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to