yifan-c commented on code in PR #139: URL: https://github.com/apache/cassandra-analytics/pull/139#discussion_r2380422803
########## cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.spark.reader.common.AbstractCompressionMetadata; +import org.apache.cassandra.spark.reader.common.BigLongArray; + +/** + * Holds metadata about compressed file + */ +// CompressionMetadata is mocked in IndexReaderTests and mockito does not support mocking final classes +// CHECKSTYLE IGNORE: FinalClass +public class CompressionMetadata extends AbstractCompressionMetadata +{ + private final CompressionParams parameters; + private final double crcCheckChance; // CRC check chance defined on table level + + private CompressionMetadata(long dataLength, BigLongArray chunkOffsets, CompressionParams parameters, double crcCheckChance) + { + super(dataLength, chunkOffsets); + this.parameters = parameters; + this.crcCheckChance = crcCheckChance; + } + + static CompressionMetadata fromInputStream(InputStream inStream, boolean hasCompressedLength, double crcCheckChance) throws IOException + { + long dataLength; + BigLongArray chunkOffsets; + + DataInputStream inData = new DataInputStream(inStream); + + String compressorName = inData.readUTF(); + int optionCount = inData.readInt(); + Map<String, String> options = new HashMap<>(optionCount); + for (int option = 0; option < optionCount; ++option) + { + options.put(inData.readUTF(), inData.readUTF()); + } + + int chunkLength = inData.readInt(); + int minCompressRatio = 2147483647; + if (hasCompressedLength) + { + minCompressRatio = inData.readInt(); + } + + CompressionParams params = new CompressionParams(compressorName, chunkLength, minCompressRatio, options); + + dataLength = inData.readLong(); + + int chunkCount = inData.readInt(); + chunkOffsets = new BigLongArray(chunkCount); + + for (int chunk = 0; chunk < chunkCount; chunk++) + { + try + { + chunkOffsets.set(chunk, inData.readLong()); + } + catch (EOFException exception) + { + throw new EOFException(String.format("Corrupted compression index: read %d but expected %d chunks.", + chunk, chunkCount)); + } + } + + return new CompressionMetadata(dataLength, chunkOffsets, params, crcCheckChance); + } + + ICompressor compressor() + { + return parameters.getSstableCompressor(); + } + + @Override + protected int chunkLength() + { + return parameters.chunkLength(); + } + + @Override + protected double crcCheckChance() + { + return crcCheckChance; + } + + /** + * @return Cassandra internal {@code CompressionMetadata}, which can be used to construct {@code FileHandle}. + */ + public org.apache.cassandra.io.compress.CompressionMetadata toInternal(File file, long compressedFileLength) + { + AlignedReadonlyLongArrayMemory memory = new AlignedReadonlyLongArrayMemory(chunkOffsets); + return new org.apache.cassandra.io.compress.CompressionMetadata(file, Review Comment: It would be nice to have the stream API provided by Cassandra directly so that there is no need to have the `toInternal` method and the memory class. But, it is what it is. ########## cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.ReadOnlyInputStreamFileChannel; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.spark.data.FileType; +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.reader.IndexConsumer; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.ReaderUtils; +import org.apache.cassandra.spark.reader.SSTableCache; +import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; +import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; +import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; +import org.apache.cassandra.utils.FilterFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.cassandra.spark.reader.BigIndexReader.calculateCompressedSize; + +public class BtiReaderUtils +{ + private static final Set<Component> indexComponents = ImmutableSet.of(BtiFormat.Components.DATA, + BtiFormat.Components.PARTITION_INDEX, + BtiFormat.Components.ROW_INDEX); + + private BtiReaderUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + public static TokenRange partitionIndexTokenRange(@NotNull SSTable ssTable, Review Comment: Overlaps with `org.apache.cassandra.spark.reader.ReaderUtils#keysFromIndex` ########## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/BigIndexReader.java: ########## @@ -271,6 +271,7 @@ private static long chunkCompressedLength(AbstractCompressionMetadata.Chunk chun return chunk.length >= 0 ? chunk.length : compressedDataLength - chunk.offset; } + // TODO: Adjust tests names. Review Comment: TODO ########## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java: ########## @@ -101,6 +101,20 @@ public BufferingInputStream(CassandraFileSource<T> source, BufferingInputStreamS this.stats = stats; } + public BufferingInputStream(CassandraFileSource<T> source, BufferingInputStreamStats<T> stats, long position) + { + this(source, stats); + this.rangeStart = position; + this.bytesRead = position == 0 ? 0 : (position + 1); Review Comment: Add a comment to clarify the reasoning behind the calculation of `byteRead`. ########## cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.io.sstable.format.bti; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.ReadOnlyInputStreamFileChannel; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.spark.data.FileType; +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.reader.IndexConsumer; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.ReaderUtils; +import org.apache.cassandra.spark.reader.SSTableCache; +import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; +import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; +import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; +import org.apache.cassandra.utils.FilterFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.cassandra.spark.reader.BigIndexReader.calculateCompressedSize; + +public class BtiReaderUtils Review Comment: Please add test for the methods in `BtiReaderUtils` ########## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java: ########## @@ -113,6 +113,10 @@ private <T> Cache<SSTable, T> buildCache(int size, int expireAfterMins) public SummaryDbUtils.Summary keysFromSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException { + if (ssTable.isBtiFormat()) + { + return null; + } Review Comment: This is tricky. I think it diverged from the API contract. The original implementation either returns the cached value or IOException, inheriting from `cache.get` API. We should maintain the same contract here and throw IOException. ########## cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/util/ReadOnlyInputStreamFileChannel.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +import org.apache.cassandra.spark.utils.streaming.BufferingInputStream; + +public class ReadOnlyInputStreamFileChannel extends FileChannel +{ + private BufferingInputStream<?> inputStream; + private final long size; + private long position; + + public ReadOnlyInputStreamFileChannel(BufferingInputStream<?> inputStream, long size) + { + this.inputStream = inputStream; + this.size = size; + this.position = 0; + } + + public int read(ByteBuffer dst) throws IOException + { + // setup appropriate remaining size of the buffer + int streamRemaining = Math.toIntExact(Math.min(size - position, Integer.MAX_VALUE)); + int newLimit = dst.position() + Math.min(streamRemaining, dst.remaining()); + dst.limit(Math.min(newLimit, dst.capacity())); + + int read = inputStream.read(dst); + position += read; + if (dst.position() == 0 && dst.limit() > 0) + { + // TODO(c4c5): C* SimpleChunkReader flips the buffer, so position should be set to the end. Review Comment: Is the TODO addressed? ########## cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/SSTableRequirementExtension.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.jupiter.api.extension.DynamicTestInvocationContext; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.InvocationInterceptor; +import org.junit.jupiter.api.extension.ReflectiveInvocationContext; + +import org.apache.cassandra.bridge.CassandraVersion; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public class SSTableRequirementExtension implements InvocationInterceptor +{ + public <T> T interceptTestClassConstructor(Invocation<T> invocation, + ReflectiveInvocationContext<Constructor<T>> invocationContext, + ExtensionContext extensionContext) throws Throwable + { + SSTableRequirement versionRequirement = extensionContext + .getRequiredTestClass() + .getAnnotation(SSTableRequirement.class); + skipIfOutOfScope(versionRequirement); + return invocation.proceed(); + } + + public void interceptTestMethod(Invocation<Void> invocation, + ReflectiveInvocationContext<Method> invocationContext, + ExtensionContext extensionContext) throws Throwable + { + interceptTestMethod(invocation, extensionContext); + } + + public void interceptDynamicTest(Invocation<Void> invocation, + DynamicTestInvocationContext invocationContext, + ExtensionContext extensionContext) throws Throwable + { + interceptTestMethod(invocation, extensionContext); + } + + public void interceptTestTemplateMethod(Invocation<Void> invocation, + ReflectiveInvocationContext<Method> invocationContext, + ExtensionContext extensionContext) throws Throwable + { + interceptTestMethod(invocation, extensionContext); + } + + private void interceptTestMethod(Invocation<Void> invocation, + ExtensionContext extensionContext) throws Throwable + { + SSTableRequirement versionRequirement = extensionContext + .getRequiredTestMethod() + .getAnnotation(SSTableRequirement.class); + skipIfOutOfScope(versionRequirement); + invocation.proceed(); + } + + private void skipIfOutOfScope(SSTableRequirement requirement) + { + if (requirement != null) + { + assumeTrue(CassandraVersion.sstableFormat().equals(requirement.format()), + requirement::description); + } Review Comment: It seems simpler to just use the `assume` API, instead of creating the testing annotation and the extension code. (Prefer the `assumeThat` API from AssertJ) ########## cassandra-analytics-core/build.gradle: ########## @@ -182,6 +182,8 @@ tasks.register('testSequential', Test) { } } + // TODO: Add BTI tests in integration tests. + systemProperty "cassandra.analytics.bridges.sstable_format", System.getProperty("cassandra.analytics.bridges.sstable_format", "big") Review Comment: Is this todo addressed? ########## .circleci/config.yml: ########## @@ -195,7 +198,33 @@ jobs: - store_test_results: path: build/test-reports - spark3-2_13-jdk11: + spark3-2_13-jdk11-big: Review Comment: Curious, why is it necessary to have dedicated pipelines per sstable format? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
