yifan-c commented on code in PR #173:
URL:
https://github.com/apache/cassandra-analytics/pull/173#discussion_r2843538025
##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java:
##########
@@ -109,6 +121,53 @@ public static boolean primaryIndexContainsAnyKey(@NotNull
SSTable ssTable,
return exists.get();
}
+ @Nullable
+ public static Long startOffsetInDataFile(@NotNull SSTable ssTable,
+ @NotNull TableMetadata metadata,
+ @NotNull Descriptor descriptor,
+ @NotNull TokenRange tokenRange)
+ {
+ final AtomicReference<Long> offset = new AtomicReference<>(null);
+ try
+ {
+ // Loading data file is required by BtiTableReader.
+ // Migrate to
org.apache.cassandra.io.sstable.format.SSTableReader#getApproximatePositionsForRanges
Review Comment:
I cannot find the method in Cassandra trunk
`a9ee34b62d977893380b0b753c25b2b0aa68fa11`. Is it from an unmerged patch in
Cassandra?
How about filing a JIRA (in Analytics) and comment here to not forget?
##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java:
##########
@@ -109,6 +121,53 @@ public static boolean primaryIndexContainsAnyKey(@NotNull
SSTable ssTable,
return exists.get();
}
+ @Nullable
+ public static Long startOffsetInDataFile(@NotNull SSTable ssTable,
+ @NotNull TableMetadata metadata,
+ @NotNull Descriptor descriptor,
+ @NotNull TokenRange tokenRange)
+ {
+ final AtomicReference<Long> offset = new AtomicReference<>(null);
+ try
+ {
+ // Loading data file is required by BtiTableReader.
+ // Migrate to
org.apache.cassandra.io.sstable.format.SSTableReader#getApproximatePositionsForRanges
+ // when it is available.
+ withPartitionIndex(ssTable, descriptor, metadata, true, false,
(dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> {
+ TableMetadataRef metadataRef =
TableMetadataRef.forOfflineTools(metadata);
+ BtiTableReader btiTableReader = new
BtiTableReader.Builder(descriptor)
+ .setDataFile(dataFileHandle)
+
.setPartitionIndex(partitionIndex)
+ .setComponents(indexComponents)
+
.setTableMetadataRef(metadataRef)
+
.setFilter(FilterFactory.AlwaysPresent)
+ .build(null, false, false);
+ Token tokenStart =
TokenUtils.bigIntegerToToken(metadata.partitioner, tokenRange.lowerEndpoint());
+ Token tokenEnd =
TokenUtils.bigIntegerToToken(metadata.partitioner, tokenRange.upperEndpoint());
+ Range<Token> range = new Range<>(tokenStart, tokenEnd);
+ try
+ {
+ List<SSTableReader.PartitionPositionBounds> position =
+
btiTableReader.getPositionsForRanges(Collections.singletonList(range));
+ if (position.size() == 1)
+ {
+ offset.set(position.get(0).lowerPosition);
+ }
Review Comment:
1. nit: rename to `positions`?
2. When the positions size is not 1, it is simply ignored. It is expected?
If so, please make a comment
##########
cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java:
##########
@@ -46,6 +47,20 @@ public static BigInteger tokenToBigInteger(final Token token)
throw new UnsupportedOperationException("Unexpected token type: " +
token.getClass().getName());
}
+ public static Token bigIntegerToToken(final IPartitioner partitioner,
final BigInteger token)
Review Comment:
nit: remove `final`.
The method seem to be unused?
##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java:
##########
@@ -109,6 +121,53 @@ public static boolean primaryIndexContainsAnyKey(@NotNull
SSTable ssTable,
return exists.get();
}
+ @Nullable
+ public static Long startOffsetInDataFile(@NotNull SSTable ssTable,
+ @NotNull TableMetadata metadata,
+ @NotNull Descriptor descriptor,
+ @NotNull TokenRange tokenRange)
+ {
+ final AtomicReference<Long> offset = new AtomicReference<>(null);
+ try
+ {
+ // Loading data file is required by BtiTableReader.
+ // Migrate to
org.apache.cassandra.io.sstable.format.SSTableReader#getApproximatePositionsForRanges
+ // when it is available.
+ withPartitionIndex(ssTable, descriptor, metadata, true, false,
(dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> {
+ TableMetadataRef metadataRef =
TableMetadataRef.forOfflineTools(metadata);
+ BtiTableReader btiTableReader = new
BtiTableReader.Builder(descriptor)
+ .setDataFile(dataFileHandle)
+
.setPartitionIndex(partitionIndex)
+ .setComponents(indexComponents)
+
.setTableMetadataRef(metadataRef)
+
.setFilter(FilterFactory.AlwaysPresent)
+ .build(null, false, false);
+ Token tokenStart =
TokenUtils.bigIntegerToToken(metadata.partitioner, tokenRange.lowerEndpoint());
+ Token tokenEnd =
TokenUtils.bigIntegerToToken(metadata.partitioner, tokenRange.upperEndpoint());
+ Range<Token> range = new Range<>(tokenStart, tokenEnd);
Review Comment:
those lines are unlikely to throw, but if it happens, the `btiTableReader`
is not released. Can we ensure the reader is released?
##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/util/ReadOnlyInputStreamFileChannel.java:
##########
@@ -54,7 +54,7 @@ public int read(ByteBuffer dst) throws IOException
if (dst.position() == 0 && dst.limit() > 0)
{
// o.a.c.io.util.SimpleChunkReader flips the buffer, so position
should be set to the end.
- dst.position(read);
+ dst.position(Math.min(read, dst.limit()));
Review Comment:
When `read` can be greater than `dst.limit()`? A bit concerned the guard
masks some true bug, if `Math.min` is necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]