yifan-c commented on code in PR #155:
URL: 
https://github.com/apache/cassandra-analytics/pull/155#discussion_r2536119762


##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilter.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link TimeRangeFilter} to filter out based on timestamp.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public final class TimeRangeFilter implements Serializable

Review Comment:
   maybe consider removing the `final` modifier for extension. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilter.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link TimeRangeFilter} to filter out based on timestamp.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public final class TimeRangeFilter implements Serializable
+{
+    private final Range<Long> timeRange;
+
+    /**
+     * Creates a {@link TimeRangeFilter} with given time {@link Range}
+     */
+    private TimeRangeFilter(Range<Long> timeRange)
+    {
+        this.timeRange = timeRange;
+    }
+
+    /**
+     * Returns the underlying Range.
+     *
+     * @return the time range
+     */
+    @NotNull
+    public Range<Long> range()
+    {
+        return timeRange;
+    }
+
+    /**
+     * Determines if given start and end timestamp match the filter. SSTable 
is included if it overlaps with
+     * filter time range.
+     *
+     * @param givenStart the SSTable min timestamp
+     * @param givenEnd the SSTable max timestamp
+     * @return true if the SSTable should be included, false if it should be 
omitted.
+     */
+    public boolean filter(long givenStart, long givenEnd)
+    {
+        // Create range for the given SSTable timestamps, always closed
+        Range<Long> sstableTimeRange = Range.closed(givenStart, givenEnd);
+
+        // Check if ranges are connected (overlap or adjacent)
+        return timeRange.isConnected(sstableTimeRange) && 
!timeRange.intersection(sstableTimeRange).isEmpty();
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("TimeRangeFilter%s", timeRange.toString());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (!(o instanceof TimeRangeFilter))
+        {
+            return false;
+        }
+        TimeRangeFilter that = (TimeRangeFilter) o;
+        return timeRange.equals(that.timeRange);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(timeRange);
+    }
+
+    /**
+     * Creates a {@link TimeRangeFilter} with only start bound.
+     *
+     * @param startTimestampMicros the start timestamp in microseconds 
(inclusive)
+     * @return {@link TimeRangeFilter} with only start timestamp
+     */
+    @NotNull
+    public static TimeRangeFilter startingAt(long startTimestampMicros)

Review Comment:
   👍  on add the time unit here. 
   
   Can you update the other time references that the time unit is in micros? I 
believe this filter implementation only expect time in micros. Please consider 
document it in the class, and even rename the class name if it can be more 
explicit. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilter.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link TimeRangeFilter} to filter out based on timestamp.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public final class TimeRangeFilter implements Serializable
+{
+    private final Range<Long> timeRange;
+
+    /**
+     * Creates a {@link TimeRangeFilter} with given time {@link Range}
+     */
+    private TimeRangeFilter(Range<Long> timeRange)
+    {
+        this.timeRange = timeRange;
+    }
+
+    /**
+     * Returns the underlying Range.
+     *
+     * @return the time range
+     */
+    @NotNull
+    public Range<Long> range()
+    {
+        return timeRange;
+    }
+
+    /**
+     * Determines if given start and end timestamp match the filter. SSTable 
is included if it overlaps with
+     * filter time range.
+     *
+     * @param givenStart the SSTable min timestamp
+     * @param givenEnd the SSTable max timestamp
+     * @return true if the SSTable should be included, false if it should be 
omitted.
+     */
+    public boolean filter(long givenStart, long givenEnd)
+    {
+        // Create range for the given SSTable timestamps, always closed
+        Range<Long> sstableTimeRange = Range.closed(givenStart, givenEnd);
+
+        // Check if ranges are connected (overlap or adjacent)
+        return timeRange.isConnected(sstableTimeRange) && 
!timeRange.intersection(sstableTimeRange).isEmpty();

Review Comment:
   I think you can come up with a comparison by avoiding creating a new range 
instance. Could you update the implementation here? `intersection()` method 
creates a new range each time. 
   
   Given both ranges for comparison are closed, calling `isConnected` should be 
able to tell the result. No?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java:
##########
@@ -249,6 +258,7 @@ private LocalDataLayer(@NotNull CassandraVersion version,
                            @NotNull List<SchemaFeature> requestedFeatures,
                            boolean useBufferingInputStream,
                            @Nullable String statsClass,
+                           @Nullable TimeRangeFilter sstableTimeRangeFilter,

Review Comment:
   I noticed that the patch uses `List.of()` for sstableTimeRangeFilter when 
not defined. But the `Nullable` here implies that null value can also be used. 
It is inconsistent. Please pick one; otherwise, it will bite us later. 
   I would suggest to change the annotation to `@NotNull` and ensure callsites 
do not pass null value.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java:
##########
@@ -111,4 +115,29 @@ public static List<List<String>> 
cartesianProduct(List<List<String>> orderedValu
             }
         }
     }
+
+    /**
+     * Parses {@link TimeRangeFilter} from spark options.
+     */
+    public static TimeRangeFilter parseSSTableTimeRangeFilter(Map<String, 
String> options)
+    {
+        if (!options.containsKey(SSTABLE_START_TIMESTAMP_MICROS) && 
!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+        {
+            return null;
+        }
+
+        long startTimestamp = MapUtils.getLong(options, 
SSTABLE_START_TIMESTAMP_MICROS, Long.MIN_VALUE);
+        if (!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+        {
+            return TimeRangeFilter.startingAt(startTimestamp);
+        }

Review Comment:
   I do not think it is necessary. Both cases (the end time option exists or 
not) are already covered in `TimeRangeFilter.create`



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java:
##########
@@ -143,4 +146,161 @@ private void 
testTtlUsingConstantReferenceTimeHelper(CassandraBridge bridgeForTe
         .forAll(TestUtils.partitioners())
         .checkAssert(partitioner -> runTest(partitioner, bridgeForTest, test));
     }
+
+    // TimeRangeFilter Tests
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testTimeRangeFilterSkipsSSTablesOutsideRange(CassandraBridge 
bridge)
+    {
+        TestRunnable test = (partitioner, dir, bridgeInTest) -> {
+            TestSchema schema = TestSchema.builder(bridgeInTest)
+                                          .withPartitionKey("a", 
bridgeInTest.aInt())
+                                          .withColumn("b", bridgeInTest.aInt())
+                                          .build();
+
+            // Write SSTable with data
+            schema.writeSSTable(dir, bridgeInTest, partitioner, writer -> {
+                for (int i = 0; i < 10; i++)
+                {
+                    writer.write(i, i * 100);
+                }
+            });
+
+            // Wait to ensure timestamp advances
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            long currentTimestampMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); // Convert to 
microseconds

Review Comment:
   Can you look into using the `TimeProvider` to mock the time, instead of 
pausing? 
   Check out 
`org.apache.cassandra.spark.SSTableReaderTests#testTtlUsingConstantReferenceTimeHelper`
 for the usage example. 



##########
cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilterTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link TimeRangeFilter}
+ */
+class TimeRangeFilterTest
+{
+    @Test
+    void testCreation()
+    {
+        TimeRangeFilter filter = TimeRangeFilter.create(100L, 200L);
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().hasUpperBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(100);
+        assertThat(filter.range().upperEndpoint()).isEqualTo(200);
+    }
+
+    @Test
+    void testThrowsExceptionWhenStartGreaterThanEnd()
+    {
+        assertThatThrownBy(() -> TimeRangeFilter.create(200L, 100L))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid range: [200‥100]");
+    }
+
+    @Test
+    void testFiltering()
+    {
+        // both start and end inclusive
+
+        TimeRangeFilter sstableBeforeInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [0, 50], Filter: [100, 200]
+        assertThat(sstableBeforeInclusiveBounds.filter(0L, 50L)).isFalse();
+
+        TimeRangeFilter sstableTouchingStartInclusiveBound = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [0, 100], Filter: [100, 200] - touching inclusive
+        assertThat(sstableTouchingStartInclusiveBound.filter(0L, 
100L)).isTrue();
+
+        TimeRangeFilter sstableAfterInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [250, 300], Filter: [100, 200]
+        assertThat(sstableAfterInclusiveBounds.filter(250L, 300L)).isFalse();
+
+        TimeRangeFilter sstableTouchingEndInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [200, 300], Filter: [100, 200] - touching inclusive
+        assertThat(sstableTouchingEndInclusiveBounds.filter(200L, 
300L)).isTrue();
+
+        TimeRangeFilter sstableWithinInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [120, 180], Filter: [100, 200]
+        assertThat(sstableWithinInclusiveBounds.filter(120L, 180L)).isTrue();
+
+        TimeRangeFilter sstableEndOverlapInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [50, 150], Filter: [100, 200]
+        assertThat(sstableEndOverlapInclusiveBounds.filter(50L, 
150L)).isTrue();
+
+        TimeRangeFilter sstableStartOverlapInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [150, 250], Filter: [100, 200]
+        assertThat(sstableStartOverlapInclusiveBounds.filter(150L, 
250L)).isTrue();
+
+        TimeRangeFilter sstableTimeRangeLargerThanInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [50, 250], Filter: [100, 200]
+        assertThat(sstableTimeRangeLargerThanInclusiveBounds.filter(50L, 
250L)).isTrue();
+
+        // end inclusive
+
+        TimeRangeFilter sstableTouchingStartExclusive = 
TimeRangeFilter.create(101L, 200L);
+        // SSTable: [0, 100], Filter: (100, 200] - no overlap
+        assertThat(sstableTouchingStartExclusive.filter(0L, 100L)).isFalse();
+
+        TimeRangeFilter sstableOverlapWithStartExclusive = 
TimeRangeFilter.create(99L, 200L);
+        // SSTable: [50, 150], Filter: (100, 200]
+        assertThat(sstableOverlapWithStartExclusive.filter(50L, 
150L)).isTrue();
+
+        // start inclusive
+
+        TimeRangeFilter sstableTouchingEndExclusive = 
TimeRangeFilter.create(100L, 199L);
+        // SSTable: [200, 300], Filter: [100, 200) - no overlap
+        assertThat(sstableTouchingEndExclusive.filter(200L, 300L)).isFalse();

Review Comment:
   This does not really create the range [100, 200). It creates [100, 199]. The 
test case here (and the other ones testing exclusiveness) are all redundant and 
even misleading. Please have a clear definition that `TimeRangeFilter` creates 
closed range of timestamps in micros only. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilter.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link TimeRangeFilter} to filter out based on timestamp.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public final class TimeRangeFilter implements Serializable
+{
+    private final Range<Long> timeRange;
+
+    /**
+     * Creates a {@link TimeRangeFilter} with given time {@link Range}
+     */
+    private TimeRangeFilter(Range<Long> timeRange)
+    {
+        this.timeRange = timeRange;
+    }
+
+    /**
+     * Returns the underlying Range.
+     *
+     * @return the time range
+     */
+    @NotNull
+    public Range<Long> range()
+    {
+        return timeRange;
+    }
+
+    /**
+     * Determines if given start and end timestamp match the filter. SSTable 
is included if it overlaps with
+     * filter time range.
+     *
+     * @param givenStart the SSTable min timestamp
+     * @param givenEnd the SSTable max timestamp
+     * @return true if the SSTable should be included, false if it should be 
omitted.
+     */
+    public boolean filter(long givenStart, long givenEnd)

Review Comment:
   Can you just rename the method to `overlaps`? Given that it is really to 
check whether 2 ranges overlap. 
   The "given" in the parameter names is redundant, please remove it and 
actually add "inclusive" and "micros" to the name, if the idea is to improve 
clarity. 



##########
cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilterTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link TimeRangeFilter}
+ */
+class TimeRangeFilterTest
+{
+    @Test
+    void testCreation()
+    {
+        TimeRangeFilter filter = TimeRangeFilter.create(100L, 200L);
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().hasUpperBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(100);
+        assertThat(filter.range().upperEndpoint()).isEqualTo(200);
+    }
+
+    @Test
+    void testThrowsExceptionWhenStartGreaterThanEnd()
+    {
+        assertThatThrownBy(() -> TimeRangeFilter.create(200L, 100L))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid range: [200‥100]");
+    }
+
+    @Test
+    void testFiltering()
+    {
+        // both start and end inclusive
+
+        TimeRangeFilter sstableBeforeInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [0, 50], Filter: [100, 200]
+        assertThat(sstableBeforeInclusiveBounds.filter(0L, 50L)).isFalse();
+
+        TimeRangeFilter sstableTouchingStartInclusiveBound = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [0, 100], Filter: [100, 200] - touching inclusive
+        assertThat(sstableTouchingStartInclusiveBound.filter(0L, 
100L)).isTrue();
+
+        TimeRangeFilter sstableAfterInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [250, 300], Filter: [100, 200]
+        assertThat(sstableAfterInclusiveBounds.filter(250L, 300L)).isFalse();
+
+        TimeRangeFilter sstableTouchingEndInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [200, 300], Filter: [100, 200] - touching inclusive
+        assertThat(sstableTouchingEndInclusiveBounds.filter(200L, 
300L)).isTrue();
+
+        TimeRangeFilter sstableWithinInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [120, 180], Filter: [100, 200]
+        assertThat(sstableWithinInclusiveBounds.filter(120L, 180L)).isTrue();
+
+        TimeRangeFilter sstableEndOverlapInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [50, 150], Filter: [100, 200]
+        assertThat(sstableEndOverlapInclusiveBounds.filter(50L, 
150L)).isTrue();
+
+        TimeRangeFilter sstableStartOverlapInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [150, 250], Filter: [100, 200]
+        assertThat(sstableStartOverlapInclusiveBounds.filter(150L, 
250L)).isTrue();
+
+        TimeRangeFilter sstableTimeRangeLargerThanInclusiveBounds = 
TimeRangeFilter.create(100L, 200L);
+        // SSTable: [50, 250], Filter: [100, 200]
+        assertThat(sstableTimeRangeLargerThanInclusiveBounds.filter(50L, 
250L)).isTrue();
+
+        // end inclusive
+
+        TimeRangeFilter sstableTouchingStartExclusive = 
TimeRangeFilter.create(101L, 200L);
+        // SSTable: [0, 100], Filter: (100, 200] - no overlap
+        assertThat(sstableTouchingStartExclusive.filter(0L, 100L)).isFalse();
+
+        TimeRangeFilter sstableOverlapWithStartExclusive = 
TimeRangeFilter.create(99L, 200L);
+        // SSTable: [50, 150], Filter: (100, 200]
+        assertThat(sstableOverlapWithStartExclusive.filter(50L, 
150L)).isTrue();
+
+        // start inclusive
+
+        TimeRangeFilter sstableTouchingEndExclusive = 
TimeRangeFilter.create(100L, 199L);
+        // SSTable: [200, 300], Filter: [100, 200) - no overlap
+        assertThat(sstableTouchingEndExclusive.filter(200L, 300L)).isFalse();
+
+        TimeRangeFilter sstableOverlapEndExclusive = 
TimeRangeFilter.create(100L, 199L);
+        // SSTable: [150, 250], Filter: [100, 200)
+        assertThat(sstableOverlapEndExclusive.filter(150L, 250L)).isTrue();
+
+        // both start and end exclusive
+
+        TimeRangeFilter startEndExclusiveTouchingStart = 
TimeRangeFilter.create(101L, 199L);
+        // SSTable: [0, 100], Filter: (100, 200)
+        assertThat(startEndExclusiveTouchingStart.filter(0L, 100L)).isFalse();
+
+        TimeRangeFilter startEndExclusiveTouchingEnd = 
TimeRangeFilter.create(99L, 199L);
+        // SSTable: [200, 300], Filter: (100, 200)
+        assertThat(startEndExclusiveTouchingEnd.filter(200L, 300L)).isFalse();
+
+        TimeRangeFilter startEndExclusiveEnclosed = 
TimeRangeFilter.create(99L, 199L);
+        // SSTable: [120, 180], Filter: (100, 200)
+        assertThat(startEndExclusiveEnclosed.filter(120L, 180L)).isTrue();
+    }
+
+    @Test
+    void testStartingAtInclusive()
+    {
+        TimeRangeFilter filter = TimeRangeFilter.startingAt(100L);
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(100L);
+        assertThat(filter.range().hasUpperBound()).isFalse();
+
+        assertThat(filter.filter(0L, 50L)).isFalse();
+        assertThat(filter.filter(0L, 100L)).isTrue();
+        assertThat(filter.filter(100L, 200L)).isTrue();
+        assertThat(filter.filter(200L, 300L)).isTrue();
+    }
+
+    @Test
+    void testStartingAtExclusive()
+    {
+        TimeRangeFilter filter = TimeRangeFilter.startingAt(101L);
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(101);
+        assertThat(filter.range().hasUpperBound()).isFalse();
+
+        assertThat(filter.filter(0L, 100L)).isFalse();
+        assertThat(filter.filter(100L, 200L)).isTrue();
+    }
+
+    @Test
+    void testEndingAtInclusive()
+    {
+        TimeRangeFilter filter = TimeRangeFilter.endingAt(200L);
+        assertThat(filter.range().hasLowerBound()).isFalse();
+        assertThat(filter.range().upperEndpoint()).isEqualTo(200);
+        assertThat(filter.range().hasUpperBound()).isTrue();
+
+        assertThat(filter.filter(0L, 100L)).isTrue();
+        assertThat(filter.filter(100L, 200L)).isTrue();
+        assertThat(filter.filter(200L, 300L)).isTrue();
+        assertThat(filter.filter(250L, 300L)).isFalse();
+    }
+
+    @Test
+    void testEndingAtExclusive()
+    {
+        TimeRangeFilter filter = TimeRangeFilter.endingAt(199L);
+        assertThat(filter.range().hasLowerBound()).isFalse();
+        assertThat(filter.range().upperEndpoint()).isEqualTo(199L);
+        assertThat(filter.range().hasUpperBound()).isTrue();
+
+        assertThat(filter.filter(100L, 200L)).isTrue();
+        assertThat(filter.filter(200L, 300L)).isFalse();
+    }
+
+    @Test
+    void testToString()
+    {
+        TimeRangeFilter boundedRangeFilter = TimeRangeFilter.create(100L, 
200L);
+        
assertThat(boundedRangeFilter.toString()).isEqualTo("TimeRangeFilter[100‥200]");
+
+        TimeRangeFilter startAtInclusiveFilter = 
TimeRangeFilter.startingAt(100L);
+        
assertThat(startAtInclusiveFilter.toString()).isEqualTo("TimeRangeFilter[100‥+∞)");
+
+        TimeRangeFilter endAtInclusiveFilter = TimeRangeFilter.endingAt(200L);
+        
assertThat(endAtInclusiveFilter.toString()).isEqualTo("TimeRangeFilter(-∞‥200]");

Review Comment:
   negative value looks confusing for time range. I would rather use 0 as the 
start. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java:
##########
@@ -264,6 +264,7 @@ public void validateSSTables(@NotNull BulkWriterContext 
writerContext, @NotNull
                                                       Collections.emptyList() 
/* requestedFeatures */,
                                                       false /* 
useSSTableInputStream */,
                                                       null /* statsClass */,
+                                                      null,

Review Comment:
   add comment to be consistent with the other parameters.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java:
##########
@@ -111,4 +115,29 @@ public static List<List<String>> 
cartesianProduct(List<List<String>> orderedValu
             }
         }
     }
+
+    /**
+     * Parses {@link TimeRangeFilter} from spark options.
+     */
+    public static TimeRangeFilter parseSSTableTimeRangeFilter(Map<String, 
String> options)
+    {
+        if (!options.containsKey(SSTABLE_START_TIMESTAMP_MICROS) && 
!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+        {
+            return null;

Review Comment:
   This is _only_ a nit:
   
   ```suggestion
               return TimeRangeFilter.empty(); // create a constant: 
Range.openClosed(0L, 0L) as empty range in TimeRangeFilter
   ```



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/TimeRangeFilter.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link TimeRangeFilter} to filter out based on timestamp.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public final class TimeRangeFilter implements Serializable
+{
+    private final Range<Long> timeRange;
+
+    /**
+     * Creates a {@link TimeRangeFilter} with given time {@link Range}
+     */
+    private TimeRangeFilter(Range<Long> timeRange)
+    {
+        this.timeRange = timeRange;
+    }
+
+    /**
+     * Returns the underlying Range.
+     *
+     * @return the time range
+     */
+    @NotNull
+    public Range<Long> range()
+    {
+        return timeRange;
+    }
+
+    /**
+     * Determines if given start and end timestamp match the filter. SSTable 
is included if it overlaps with
+     * filter time range.
+     *
+     * @param givenStart the SSTable min timestamp
+     * @param givenEnd the SSTable max timestamp
+     * @return true if the SSTable should be included, false if it should be 
omitted.
+     */
+    public boolean filter(long givenStart, long givenEnd)
+    {
+        // Create range for the given SSTable timestamps, always closed
+        Range<Long> sstableTimeRange = Range.closed(givenStart, givenEnd);
+
+        // Check if ranges are connected (overlap or adjacent)
+        return timeRange.isConnected(sstableTimeRange) && 
!timeRange.intersection(sstableTimeRange).isEmpty();
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("TimeRangeFilter%s", timeRange.toString());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (!(o instanceof TimeRangeFilter))
+        {
+            return false;
+        }
+        TimeRangeFilter that = (TimeRangeFilter) o;
+        return timeRange.equals(that.timeRange);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(timeRange);
+    }
+
+    /**
+     * Creates a {@link TimeRangeFilter} with only start bound.
+     *
+     * @param startTimestampMicros the start timestamp in microseconds 
(inclusive)
+     * @return {@link TimeRangeFilter} with only start timestamp
+     */
+    @NotNull
+    public static TimeRangeFilter startingAt(long startTimestampMicros)
+    {
+        Range<Long> range = Range.atLeast(startTimestampMicros);
+        return new TimeRangeFilter(range);
+    }
+
+    /**
+     * Creates a {@link TimeRangeFilter} with only end bound.
+     *
+     * @param endTimestampMicros the end timestamp in microseconds (inclusive)
+     * @return {@link TimeRangeFilter} with only end timestamp
+     */
+    @NotNull
+    public static TimeRangeFilter endingAt(long endTimestampMicros)
+    {
+        Range<Long> range = Range.atMost(endTimestampMicros);

Review Comment:
   Please consider having `Range.closed(0, endTimestampMicros)` instead



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java:
##########
@@ -198,6 +199,11 @@ public SparkRangeFilter sparkRangeFilter(int partitionId)
         return null;
     }
 
+    public List<TimeRangeFilter> sstableTimeRangeFilters()

Review Comment:
   Please add javadoc. In addition, define the meaning of the empty list. 



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java:
##########
@@ -88,4 +97,98 @@ public void testEquality(CassandraBridge bridge)
         assertThat(dataLayer2).isEqualTo(dataLayer1);
         assertThat(dataLayer2.hashCode()).isEqualTo(dataLayer1.hashCode());
     }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+    public void testTimeRangeFilterFromOptions(CassandraBridge bridge)
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("version", bridge.getVersion().name());
+        options.put("partitioner", Partitioner.Murmur3Partitioner.name());
+        options.put("keyspace", "test_keyspace");
+        options.put("createstmt", SchemaTests.SCHEMA);
+        options.put("dirs", "/tmp/data1,/tmp/data2");
+        options.put("sstable_start_timestamp_micros", "1000");
+        options.put("sstable_end_timestamp_micros", "2000");
+
+        LocalDataLayer dataLayer = LocalDataLayer.from(options);
+
+        List<TimeRangeFilter> filters = dataLayer.sstableTimeRangeFilters();
+        assertThat(filters).hasSize(1);

Review Comment:
   When there will be a list of `TimeRangeFilter`? And how to configure the 
list?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java:
##########
@@ -81,6 +83,14 @@ public class ClientConfig
     public static final String ENABLE_EXPANSION_SHRINK_CHECK_KEY = 
"enableExpansionShrinkCheck";
     public static final String SIDECAR_PORT = "sidecar_port";
     public static final String QUOTE_IDENTIFIERS = "quote_identifiers";
+    /**
+     * {@code sstable_start_timestamp} and {@code sstable_end_timestamp} are 
by default inclusive. Both start and end
+     * timestamps are represented in microseconds. To have exclusive bounds, 
add +1 offset for exclusive start bound
+     * and add -1 for exclusive end bound.

Review Comment:
   The comment is confusing to read. Both the range ends are closed/inclusive 
_always_.
   
   I would just have this.
   ```suggestion
        * {@code sstable_start_timestamp_micros} and {@code 
sstable_end_timestamp_micros} define a time range filter
        * for SSTable selection. Both timestamps are represented in 
microseconds and both bounds are always inclusive
        * (closed range).
   ```
   
   



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java:
##########
@@ -111,4 +115,29 @@ public static List<List<String>> 
cartesianProduct(List<List<String>> orderedValu
             }
         }
     }
+
+    /**
+     * Parses {@link TimeRangeFilter} from spark options.
+     */
+    public static TimeRangeFilter parseSSTableTimeRangeFilter(Map<String, 
String> options)
+    {
+        if (!options.containsKey(SSTABLE_START_TIMESTAMP_MICROS) && 
!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+        {
+            return null;
+        }
+
+        long startTimestamp = MapUtils.getLong(options, 
SSTABLE_START_TIMESTAMP_MICROS, Long.MIN_VALUE);
+        if (!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+        {
+            return TimeRangeFilter.startingAt(startTimestamp);
+        }
+
+        long endTimestamp = MapUtils.getLong(options, 
SSTABLE_END_TIMESTAMP_MICROS, Long.MAX_VALUE);
+        if (!options.containsKey(SSTABLE_START_TIMESTAMP_MICROS))
+        {
+            return TimeRangeFilter.endingAt(endTimestamp);
+        }

Review Comment:
   Not necessary for the same reason. 



##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import 
org.testcontainers.shaded.com.google.common.util.concurrent.Uninterruptibles;
+
+import static 
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_END_TIMESTAMP_MICROS;
+import static 
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_START_TIMESTAMP_MICROS;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for various filters used during bulk reading.
+ */
+class BulkReaderFilteringIntegrationTest extends 
SharedClusterSparkIntegrationTestBase
+{
+    static final int DATA_SIZE = 1000;
+
+    QualifiedName twcsTable = uniqueTestTableFullName(TEST_KEYSPACE);
+
+    // Use base timestamp that's 10 minutes in the past
+    static final long BASE_TIMESTAMP_MILLIS = System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(10);
+
+    // Separate each batch by 2 minutes to ensure they go into different TWCS 
windows (1 minute window size)
+    static final long EARLY_TIMESTAMP_MICROS = 
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS);
+    static final long MIDDLE_TIMESTAMP_MICROS = 
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS + 
TimeUnit.MINUTES.toMillis(2));
+    static final long LATE_TIMESTAMP_MICROS = 
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS + 
TimeUnit.MINUTES.toMillis(4));
+
+    @Test
+    void testReadAllDataWithoutTimeRangeFilter()
+    {
+        // Read all data without any time range filter
+        Map<String, String> timeRangeOptions = Map.of();
+        int expectedDataSize = DATA_SIZE * 3; // all 3 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS, 
MIDDLE_TIMESTAMP_MICROS, LATE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartBoundInclusive()
+    {
+        // Read data starting MIDDLE_TIMESTAMP
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, 
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString());
+        int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(MIDDLE_TIMESTAMP_MICROS, 
LATE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartBoundExclusive()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS + 
1).toString());
+        Set<Long> expectedSSTableTimestamps = Set.of(LATE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE, 
expectedSSTableTimestamps); // 1 SSTables read
+    }
+
+    @Test
+    void testTimeRangeFilterWithEndBoundInclusive()
+    {
+        // Read data ending with MIDDLE_TIMESTAMP inclusive
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_END_TIMESTAMP_MICROS, 
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString());
+        int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS, 
MIDDLE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterWithEndBoundExclusive()
+    {
+        // Read data ending with MIDDLE_TIMESTAMP exclusive
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(MIDDLE_TIMESTAMP_MICROS - 
1).toString());
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE, 
expectedSSTableTimestamps); // 1 SSTables read
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartAndEndBound()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, 
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString(),
+                                                      
SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS - 
1).toString());
+        Set<Long> expectedSSTableTimestamps = Set.of(MIDDLE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE, 
expectedSSTableTimestamps); // 1 SSTables read
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartAndEndBoundExclusive()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, Long.valueOf(EARLY_TIMESTAMP_MICROS + 
1).toString(),
+                                                      
SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS - 
1).toString());
+        int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS, 
MIDDLE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterNonOverlappingBound()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(EARLY_TIMESTAMP_MICROS - 
1).toString());
+        Dataset<Row> data = bulkReaderDataFrame(twcsTable, 
timeRangeOptions).load();
+
+        List<Row> rows = data.collectAsList();
+        assertThat(rows.size()).isEqualTo(0); // no data read
+    }
+
+    private void runTimeRangeFilterTest(Map<String, String> timeRangeOptions,
+                                        int expectedDataSize,
+                                        Set<Long> expectedTimestamps)
+    {
+        Dataset<Row> data = bulkReaderDataFrame(twcsTable, 
timeRangeOptions).load();
+
+        List<Row> rows = data.collectAsList();
+        assertThat(rows.size()).isEqualTo(expectedDataSize);
+
+        Set<Long> allTimestamps = rows.stream()
+                                      .map(row -> row.getLong(2))
+                                      .collect(Collectors.toSet());
+
+        assertThat(expectedTimestamps.size()).isEqualTo(allTimestamps.size());
+        assertThat(expectedTimestamps).containsAll(allTimestamps);
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+
+        // Initialize schema for SSTable time range filtering
+
+        // Create table with TWCS compaction strategy with compaction window 1 
minute
+        createTestTable(twcsTable, "CREATE TABLE IF NOT EXISTS %s (" +
+                                   "    id text PRIMARY KEY," +
+                                   "    data text," +
+                                   "    timestamp bigint" +
+                                   ") WITH compaction = {" +
+                                   "    'class': 
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'," +
+                                   "    'compaction_window_size': '1'," +
+                                   "    'compaction_window_unit': 'MINUTES'" +
+                                   "};");
+
+        // create 3 SSTables in 3 time windows, each SSTable created 2 mins 
apart
+        IInstance instance = cluster.getFirstRunningInstance();
+        ICoordinator coordinator = instance.coordinator();
+
+        // Insert early data with early timestamps
+        for (int i = 0; i < DATA_SIZE; i++)
+        {
+            long timestamp = EARLY_TIMESTAMP_MICROS + i;
+            String query = String.format("INSERT INTO %s (id, data, timestamp) 
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+                                         twcsTable, i, "data" + i, 
EARLY_TIMESTAMP_MICROS, timestamp);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+
+        // Flush to create first SSTable
+        instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+
+        // wait for nodetool flush
+        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
+
+        // Insert middle data with middle timestamps
+        for (int i = 0; i < DATA_SIZE; i++)
+        {
+            int id = DATA_SIZE + i;
+            long timestamp = MIDDLE_TIMESTAMP_MICROS + i;
+            String query = String.format("INSERT INTO %s (id, data, timestamp) 
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+                                         twcsTable, id, "data" + id, 
MIDDLE_TIMESTAMP_MICROS, timestamp);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+
+        // Flush to create second SSTable
+        instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+
+        // wait for nodetool flush
+        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

Review Comment:
   This test suite is slow. Each test spends 1 minute on sleeping. 
   Why would nodetool flush take so long? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to