This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 972535d CASSANDRA-19927: Remove old compression cache and move to using cache of CompressionMetadata (#84) 972535d is described below commit 972535d0f7cd828b7e0e40706adbe8897a436a5d Author: jberragan <jberra...@gmail.com> AuthorDate: Tue Sep 17 15:52:45 2024 -0700 CASSANDRA-19927: Remove old compression cache and move to using cache of CompressionMetadata (#84) Deprecate old compression cache and move to using cache of CompressionMetadata, so that: - we no longer cache an entire byte array on heap - we cache and re-use the CompressionMetadata object so that only one BigLongArray object is allocated for the chunk offsets Patch by James Berragan; Reviewed by Yifan Cai; Francisco Guerrero for CASSANDRA-19927 --- .../apache/cassandra/spark/utils/Properties.java | 2 - .../java/org/apache/cassandra/clients/Sidecar.java | 38 ++--------- .../cassandra/spark/data/CassandraDataLayer.java | 12 +--- .../spark/data/SidecarProvisionedSSTable.java | 31 --------- .../cassandra/clients/SidecarClientConfigTest.java | 20 ------ .../spark/data/SidecarProvisionedSSTableTest.java | 73 ---------------------- .../spark/reader/CompressedRawInputStream.java | 15 ++++- .../spark/reader/CompressionMetadata.java | 2 +- .../apache/cassandra/spark/reader/IndexReader.java | 16 +---- .../cassandra/spark/reader/SSTableCache.java | 58 ++++++++++++++++- .../cassandra/spark/reader/SSTableReader.java | 28 ++++----- .../cassandra/spark/reader/SSTableCacheTests.java | 16 +++++ 12 files changed, 107 insertions(+), 204 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java index 6597f85..06c0056 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java @@ -40,8 +40,6 @@ public final class Properties public static final int DEFAULT_MAX_RETRIES = 10; public static final long DEFAULT_MILLIS_TO_SLEEP = 500; public static final int DEFAULT_MAX_POOL_SIZE = 64; - public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = true; - public static final long DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES = 8 * MEBI_BYTES; // 8MiB public static final long DEFAULT_MAX_BUFFER_SIZE = 6 * MEBI_BYTES; public static final long DEFAULT_CHUNK_BUFFER_SIZE = 4 * MEBI_BYTES; public static final Map<FileType, Long> DEFAULT_MAX_BUFFER_OVERRIDE = ImmutableMap.of( diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index 2f17040..fee8116 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -58,12 +58,10 @@ import org.apache.cassandra.spark.validation.SslValidation; import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.cassandra.spark.validation.TrustStoreValidation; -import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CACHE_COMPRESSION_METADATA; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE; -import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES; @@ -212,8 +210,6 @@ public final class Sidecar public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = "chunkBufferSizeBytes"; public static final String MAX_POOL_SIZE_KEY = "maxPoolSize"; public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds"; - public static final String CACHE_COMPRESSION_METADATA_KEY = "cacheCompressionMetadata"; - public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = "maxSizeCacheCompressionMetadataBytes"; private final int userProvidedPort; private final int maxRetries; @@ -225,8 +221,6 @@ public final class Sidecar private final long chunkSize; private final Map<FileType, Long> maxBufferOverride; private final Map<FileType, Long> chunkBufferOverride; - private final boolean cacheCompressionMetadata; - private final long maxSizeCacheCompressionMetadataBytes; // CHECKSTYLE IGNORE: Constructor with many parameters private ClientConfig(int userProvidedPort, @@ -238,9 +232,7 @@ public final class Sidecar int maxPoolSize, int timeoutSeconds, Map<FileType, Long> maxBufferOverride, - Map<FileType, Long> chunkBufferOverride, - boolean cacheCompressionMetadata, - long maxSizeCacheCompressionMetadataBytes) + Map<FileType, Long> chunkBufferOverride) { this.userProvidedPort = userProvidedPort; this.maxRetries = maxRetries; @@ -252,8 +244,6 @@ public final class Sidecar this.timeoutSeconds = timeoutSeconds; this.maxBufferOverride = maxBufferOverride; this.chunkBufferOverride = chunkBufferOverride; - this.cacheCompressionMetadata = cacheCompressionMetadata; - this.maxSizeCacheCompressionMetadataBytes = maxSizeCacheCompressionMetadataBytes; } public int userProvidedPort() @@ -342,9 +332,7 @@ public final class Sidecar DEFAULT_MAX_POOL_SIZE, DEFAULT_TIMEOUT_SECONDS, DEFAULT_MAX_BUFFER_OVERRIDE, - DEFAULT_CHUNK_BUFFER_OVERRIDE, - DEFAULT_CACHE_COMPRESSION_METADATA, - DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES); + DEFAULT_CHUNK_BUFFER_OVERRIDE); } public static ClientConfig create(Map<String, String> options) @@ -359,9 +347,7 @@ public final class Sidecar MapUtils.getInt(options, MAX_POOL_SIZE_KEY, DEFAULT_MAX_POOL_SIZE), MapUtils.getInt(options, TIMEOUT_SECONDS_KEY, DEFAULT_TIMEOUT_SECONDS), buildMaxBufferOverride(options, DEFAULT_MAX_BUFFER_OVERRIDE), - buildChunkBufferOverride(options, DEFAULT_CHUNK_BUFFER_OVERRIDE), - MapUtils.getBoolean(options, CACHE_COMPRESSION_METADATA_KEY, DEFAULT_CACHE_COMPRESSION_METADATA), - MapUtils.getLong(options, MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY, DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES) + buildChunkBufferOverride(options, DEFAULT_CHUNK_BUFFER_OVERRIDE) ); } @@ -401,9 +387,7 @@ public final class Sidecar int maxPoolSize, int timeoutSeconds, Map<FileType, Long> maxBufferOverride, - Map<FileType, Long> chunkBufferOverride, - boolean cacheCompressionMetadata, - long maxSizeCacheCompressionMetadataBytes) + Map<FileType, Long> chunkBufferOverride) { return new ClientConfig(userProvidedPort, maxRetries, @@ -414,19 +398,7 @@ public final class Sidecar maxPoolSize, timeoutSeconds, maxBufferOverride, - chunkBufferOverride, - cacheCompressionMetadata, - maxSizeCacheCompressionMetadataBytes); - } - - public long maxSizeCacheCompressionMetadataBytes() - { - return maxSizeCacheCompressionMetadataBytes; - } - - public boolean cacheCompressionMetadata() - { - return cacheCompressionMetadata; + chunkBufferOverride); } } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 369e58e..d997385 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -696,9 +696,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV in.readInt(), in.readInt(), (Map<FileType, Long>) in.readObject(), - (Map<FileType, Long>) in.readObject(), - in.readBoolean(), - in.readLong()); + (Map<FileType, Long>) in.readObject()); this.sslConfig = (SslConfig) in.readObject(); this.cqlTable = bridge.javaDeserialize(in, CqlTable.class); // Delegate (de-)serialization of version-specific objects to the Cassandra Bridge @@ -750,8 +748,6 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV out.writeInt(this.sidecarClientConfig.timeoutSeconds()); out.writeObject(this.sidecarClientConfig.maxBufferOverride()); out.writeObject(this.sidecarClientConfig.chunkBufferOverride()); - out.writeBoolean(this.sidecarClientConfig.cacheCompressionMetadata()); - out.writeLong(this.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes()); out.writeObject(this.sslConfig); bridge.javaSerialize(out, this.cqlTable); // Delegate (de-)serialization of version-specific objects to the Cassandra Bridge out.writeObject(this.tokenPartitioner); @@ -820,8 +816,6 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV out.writeInt(dataLayer.sidecarClientConfig.timeoutSeconds()); kryo.writeObject(out, dataLayer.sidecarClientConfig.maxBufferOverride()); kryo.writeObject(out, dataLayer.sidecarClientConfig.chunkBufferOverride()); - out.writeBoolean(dataLayer.sidecarClientConfig.cacheCompressionMetadata()); - out.writeLong(dataLayer.sidecarClientConfig.maxSizeCacheCompressionMetadataBytes()); kryo.writeObjectOrNull(out, dataLayer.sslConfig, SslConfig.class); kryo.writeObject(out, dataLayer.cqlTable); kryo.writeObject(out, dataLayer.tokenPartitioner); @@ -871,9 +865,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV in.readInt(), in.readInt(), (Map<FileType, Long>) kryo.readObject(in, HashMap.class), - (Map<FileType, Long>) kryo.readObject(in, HashMap.class), - in.readBoolean(), - in.readLong()), + (Map<FileType, Long>) kryo.readObject(in, HashMap.class)), kryo.readObjectOrNull(in, SslConfig.class), kryo.readObject(in, CqlTable.class), kryo.readObject(in, TokenPartitioner.class), diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java index d065302..70523cd 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java @@ -19,18 +19,10 @@ package org.apache.cassandra.spark.data; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.time.Duration; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.commons.io.IOUtils; import o.a.c.sidecar.client.shaded.common.response.ListSnapshotFilesResponse; import o.a.c.sidecar.client.shaded.common.utils.HttpRange; @@ -39,7 +31,6 @@ import org.apache.cassandra.clients.SidecarStreamConsumerAdapter; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstance; import org.apache.cassandra.spark.stats.Stats; -import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; import org.apache.cassandra.spark.utils.streaming.CassandraFileSource; import org.apache.cassandra.spark.utils.streaming.StreamConsumer; @@ -52,11 +43,6 @@ import org.jetbrains.annotations.Nullable; public class SidecarProvisionedSSTable extends SSTable { private static final long serialVersionUID = 6452703925812602832L; - @VisibleForTesting - public static final Cache<String, byte[]> COMPRESSION_CACHE = CacheBuilder.newBuilder() - .expireAfterAccess(1, TimeUnit.HOURS) - .maximumSize(2048) - .build(); private final SidecarClient sidecar; private final SidecarInstance instance; private final Sidecar.ClientConfig sidecarClientConfig; @@ -153,23 +139,6 @@ public class SidecarProvisionedSSTable extends SSTable return null; } - if (fileType == FileType.COMPRESSION_INFO - && sidecarClientConfig.cacheCompressionMetadata() - && snapshotFile.size < sidecarClientConfig.maxSizeCacheCompressionMetadataBytes()) - { - String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, snapshotFile.fileName); - byte[] bytes; - try - { - bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(snapshotFile, fileType))); - } - catch (ExecutionException exception) - { - throw new RuntimeException(ThrowableUtils.rootCause(exception)); - } - return new ByteArrayInputStream(bytes); - } - return open(snapshotFile, fileType); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java index 2985c5f..bff83f3 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarClientConfigTest.java @@ -26,7 +26,6 @@ import org.apache.cassandra.spark.bulkwriter.DataTransport; import static org.apache.cassandra.spark.bulkwriter.BulkSparkConf.DEFAULT_SIDECAR_PORT; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -115,23 +114,4 @@ public class SidecarClientConfigTest userAgentStr = Sidecar.transportModeBasedWriterUserAgent(DataTransport.S3_COMPAT); assertTrue(userAgentStr.endsWith(" writer-s3")); } - - @Test - public void testCompressionCacheDefaults() - { - Sidecar.ClientConfig clientConfig = Sidecar.ClientConfig.create(ImmutableMap.of()); - assertTrue(clientConfig.cacheCompressionMetadata()); - assertEquals(8L * 1024L * 1024L, clientConfig.maxSizeCacheCompressionMetadataBytes()); - } - - @Test - public void testCompressionCache() - { - Sidecar.ClientConfig clientConfig = Sidecar.ClientConfig.create(ImmutableMap.of( - "cachecompressionmetadata", "false", - "maxsizecachecompressionmetadatabytes", "4194304") - ); - assertFalse(clientConfig.cacheCompressionMetadata()); - assertEquals(4L * 1024L * 1024L, clientConfig.maxSizeCacheCompressionMetadataBytes()); - } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java index edcdd4e..21d2085 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java @@ -19,12 +19,7 @@ package org.apache.cassandra.spark.data; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; - import com.google.common.collect.ImmutableMap; -import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -35,13 +30,9 @@ import org.apache.cassandra.clients.Sidecar; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.spark.stats.Stats; -import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.mock; /** @@ -150,70 +141,6 @@ class SidecarProvisionedSSTableTest .isThrownBy(() -> prepareTable("localhost", 9043, "ks", "tbl", "snap", dataFileName)); } - private static final byte[] ARRAY = new byte[]{'a', 'b', 'c'}; - - @Test - public void testCompressionCache() - { - SSTable ssTable = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); - String key = String.format("%s/%s/%s/%s/%s", "localhost1", "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); - SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY); - try (InputStream is = ssTable.openCompressionStream()) - { - assertArrayEquals(ARRAY, IOUtils.toByteArray(is)); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Test - public void testCompressionCacheDisabled() - { - SSTable ssTable = prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("cachecompressionmetadata", "false")), - "localhost1", 9043, - "keyspace1", "table1", - "snapshot1", "na-1-big-Data.db", - 524288); - String key = String.format("%s/%s/%s/%s/%s", "localhost1", "keyspace2", "table2", "snapshot2", "na-2-big-Data.db"); - SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY); - try (InputStream is = ssTable.openCompressionStream()) - { - assertNotNull(is); - // when not cached it should return a BufferingInputStream - // but in the tests it's backed by nothing so don't consume - assertInstanceOf(BufferingInputStream.class, is); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Test - public void testCompressionCacheTooLarge() - { - SSTable ssTable = prepareTable(Sidecar.ClientConfig.create(Collections.singletonMap("maxsizecachecompressionmetadatabytes", "4194304")), - "localhost1", 9043, - "keyspace1", "table1", - "snapshot1", "na-1-big-Data.db", - 5 * 1025 * 1024); - String key = String.format("%s/%s/%s/%s/%s", "localhost1", "keyspace2", "table2", "snapshot2", "na-2-big-Data.db"); - SidecarProvisionedSSTable.COMPRESSION_CACHE.put(key, ARRAY); - try (InputStream is = ssTable.openCompressionStream()) - { - assertNotNull(is); - // when not cached it should return a BufferingInputStream - // but in the tests it's backed by nothing so don't consume - assertInstanceOf(BufferingInputStream.class, is); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - SSTable prepareTable(String sidecarHostName, int sidecarPort, String keyspace, diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java index 03bf2d6..ca9f6da 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java @@ -87,11 +87,22 @@ public final class CompressedRawInputStream extends RawInputStream InputStream compressionInfoInputStream, boolean hasCompressedLength, Stats stats) throws IOException + { + return from(ssTable, + dataInputStream, + CompressionMetadata.fromInputStream(compressionInfoInputStream, + hasCompressedLength), + stats); + } + + static CompressedRawInputStream from(@Nullable SSTable ssTable, + DataInputStream dataInputStream, + CompressionMetadata compressionMetadata, + Stats stats) { return new CompressedRawInputStream(ssTable, dataInputStream, - CompressionMetadata.fromInputStream(compressionInfoInputStream, - hasCompressedLength), + compressionMetadata, stats); } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java index 1c89853..2140ea1 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java @@ -36,7 +36,7 @@ import org.apache.cassandra.spark.reader.common.BigLongArray; */ // CompressionMetadata is mocked in IndexReaderTests and mockito does not support mocking final classes // CHECKSTYLE IGNORE: FinalClass -class CompressionMetadata extends AbstractCompressionMetadata +public class CompressionMetadata extends AbstractCompressionMetadata { private final CompressionParams parameters; diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index e112b5f..2fe4541 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -96,7 +96,7 @@ public class IndexReader implements IIndexReader } // read CompressionMetadata if it exists - CompressionMetadata compressionMetadata = getCompressionMetadata(ssTable, version.hasMaxCompressedLength()); + CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetadata(ssTable, version.hasMaxCompressedLength()); if (compressionMetadata != null) { stats.indexCompressionFileRead(System.nanoTime() - now); @@ -133,20 +133,6 @@ public class IndexReader implements IIndexReader } } - @Nullable - public static CompressionMetadata getCompressionMetadata(SSTable ssTable, - boolean hasMaxCompressedLength) throws IOException - { - try (InputStream cis = ssTable.openCompressionStream()) - { - if (cis != null) - { - return CompressionMetadata.fromInputStream(cis, hasMaxCompressedLength); - } - } - return null; - } - @SuppressWarnings("InfiniteLoopStatement") static void consumePrimaryIndex(@NotNull IPartitioner partitioner, @NotNull InputStream primaryIndex, diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java index f13d4d5..6e9079e 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java @@ -20,10 +20,13 @@ package org.apache.cassandra.spark.reader; import java.io.IOException; +import java.io.InputStream; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -35,11 +38,13 @@ import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.data.SSTable; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.utils.BloomFilter; import org.apache.cassandra.utils.Pair; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Basic cache to reduce wasteful requests on the DataLayer for cacheable SSTable metadata, @@ -59,15 +64,35 @@ public class SSTableCache propOrDefault("sbr.cache.stats.expireAfterMins", 60)); private final Cache<SSTable, BloomFilter> filter = buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384), propOrDefault("sbr.cache.filter.expireAfterMins", 60)); + // if compression is disabled then the CompressionInfo.db file will not exist + // therefore we can cache as Optional to a) avoid null errors in the cache and b) record that the component does not exist + private final Cache<SSTable, Optional<CompressionMetadata>> compressionMetadata = buildCache( + propOrDefault("sbr.cache.compressionInfo.maxEntries", 128), + propOrDefault("sbr.cache.compressionInfo.expireAfterMins", 15)); private static int propOrDefault(String name, int defaultValue) + { + return propOrDefault(name, defaultValue, Integer::parseInt); + } + + private static long propOrDefault(String name, long defaultValue) + { + return propOrDefault(name, defaultValue, Long::parseLong); + } + + private static boolean propOrDefault(String name, boolean defaultValue) + { + return propOrDefault(name, defaultValue, Boolean::parseBoolean); + } + + private static <T> T propOrDefault(String name, T defaultValue, Function<String, T> parser) { String str = System.getProperty(name); if (str != null) { try { - return Integer.parseInt(str); + return parser.apply(str); } catch (NumberFormatException exception) { @@ -108,6 +133,32 @@ public class SSTableCache return get(filter, ssTable, () -> ReaderUtils.readFilter(ssTable, descriptor.version.hasOldBfFormat())); } + @Nullable + public CompressionMetadata compressionMetadata(@NotNull SSTable ssTable, boolean hasMaxCompressedLength) throws IOException + { + if (propOrDefault("sbr.cache.compressionInfo.enabled", true)) + { + long maxSize = propOrDefault("sbr.cache.compressionInfo.maxSize", 0L); + if (maxSize <= 0 || ssTable.length(FileType.COMPRESSION_INFO) < maxSize) + { + return get(compressionMetadata, ssTable, () -> readCompressionMetadata(ssTable, hasMaxCompressedLength)).orElse(null); + } + } + return readCompressionMetadata(ssTable, hasMaxCompressedLength).orElse(null); + } + + private static Optional<CompressionMetadata> readCompressionMetadata(@NotNull SSTable ssTable, boolean hasMaxCompressedLength) throws IOException + { + try (InputStream cis = ssTable.openCompressionStream()) + { + if (cis != null) + { + return Optional.of(CompressionMetadata.fromInputStream(cis, hasMaxCompressedLength)); + } + } + return Optional.empty(); + } + boolean containsSummary(@NotNull SSTable ssTable) { return contains(summary, ssTable); @@ -123,6 +174,11 @@ public class SSTableCache return contains(stats, ssTable); } + boolean containsCompressionMetadata(@NotNull SSTable ssTable) + { + return contains(compressionMetadata, ssTable); + } + boolean containsFilter(@NotNull SSTable ssTable) { return contains(filter, ssTable); diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index a5bbc36..2b3c7c2 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -24,7 +24,6 @@ import java.io.EOFException; import java.io.File; import java.io.IOError; import java.io.IOException; -import java.io.InputStream; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -628,22 +627,19 @@ public class SSTableReader implements SparkSSTableReader, Scannable SSTableStreamReader() throws IOException { lastToken = sparkRangeFilter != null ? sparkRangeFilter.tokenRange().upperEndpoint() : null; - try (@Nullable InputStream compressionInfoInputStream = ssTable.openCompressionStream()) - { - DataInputStream dataInputStream = new DataInputStream(ssTable.openDataStream()); + @Nullable CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetadata(ssTable, version.hasMaxCompressedLength()); + DataInputStream dataInputStream = new DataInputStream(ssTable.openDataStream()); - if (compressionInfoInputStream != null) - { - dataStream = CompressedRawInputStream.fromInputStream(ssTable, - dataInputStream, - compressionInfoInputStream, - version.hasMaxCompressedLength(), - stats); - } - else - { - dataStream = new RawInputStream(dataInputStream, new byte[64 * 1024], stats); - } + if (compressionMetadata != null) + { + dataStream = CompressedRawInputStream.from(ssTable, + dataInputStream, + compressionMetadata, + stats); + } + else + { + dataStream = new RawInputStream(dataInputStream, new byte[64 * 1024], stats); } dis = new DataInputStream(dataStream); if (startOffset != null) diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index fc9db86..4ffe403 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -80,6 +80,7 @@ public class SSTableCacheTests assertFalse(SSTableCache.INSTANCE.containsSummary(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsIndex(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsStats(ssTable0)); + assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)); SummaryDbUtils.Summary key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); assertNotNull(key1); @@ -87,6 +88,7 @@ public class SSTableCacheTests assertFalse(SSTableCache.INSTANCE.containsIndex(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsStats(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable0)); + assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)); Pair<DecoratedKey, DecoratedKey> key2 = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable0); assertEquals(key1.first(), key2.left); @@ -95,6 +97,7 @@ public class SSTableCacheTests assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsStats(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable0)); + assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)); Descriptor descriptor0 = Descriptor.fromFilename( new File(String.format("./%s/%s", schema.keyspace, schema.table), dataFile0)); @@ -104,6 +107,7 @@ public class SSTableCacheTests assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0)); assertTrue(SSTableCache.INSTANCE.containsStats(ssTable0)); assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable0)); + assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)); assertEquals(componentMap, SSTableCache.INSTANCE.componentMapFromStats(ssTable0, descriptor0)); BloomFilter filter = SSTableCache.INSTANCE.bloomFilter(ssTable0, descriptor0); @@ -111,9 +115,18 @@ public class SSTableCacheTests assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0)); assertTrue(SSTableCache.INSTANCE.containsStats(ssTable0)); assertTrue(SSTableCache.INSTANCE.containsFilter(ssTable0)); + assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)); assertTrue(filter.isPresent(key1.first())); assertTrue(filter.isPresent(key1.last())); + CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetadata(ssTable0, descriptor0.version.hasMaxCompressedLength()); + assertNotNull(compressionMetadata); + assertTrue(SSTableCache.INSTANCE.containsSummary(ssTable0)); + assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable0)); + assertTrue(SSTableCache.INSTANCE.containsStats(ssTable0)); + assertTrue(SSTableCache.INSTANCE.containsFilter(ssTable0)); + assertTrue(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)); + SSTable ssTable1 = ssTables.get(1); Descriptor descriptor1 = Descriptor.fromFilename( new File(String.format("./%s/%s", schema.keyspace, schema.table), dataFile1)); @@ -121,6 +134,7 @@ public class SSTableCacheTests assertFalse(SSTableCache.INSTANCE.containsIndex(ssTable1)); assertFalse(SSTableCache.INSTANCE.containsStats(ssTable1)); assertFalse(SSTableCache.INSTANCE.containsFilter(ssTable1)); + assertFalse(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1)); SummaryDbUtils.Summary key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); assertNotEquals(key1.first(), key3.first()); assertNotEquals(key1.last(), key3.last()); @@ -138,6 +152,8 @@ public class SSTableCacheTests assertTrue(SSTableCache.INSTANCE.containsIndex(ssTable1)); assertTrue(SSTableCache.INSTANCE.containsStats(ssTable1)); assertTrue(SSTableCache.INSTANCE.containsFilter(ssTable1)); + SSTableCache.INSTANCE.compressionMetadata(ssTable1, descriptor1.version.hasMaxCompressedLength()); + assertTrue(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1)); } catch (IOException exception) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org