This is an automated email from the ASF dual-hosted git repository.

bernardobotella pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f883a74 CASSANALYTICS-95: Remove extra buffer flips (#148)
4f883a74 is described below

commit 4f883a746d2304ff8403018e59fdf671d918e694
Author: Bernardo Botella <[email protected]>
AuthorDate: Thu Oct 30 14:44:30 2025 +0100

    CASSANALYTICS-95: Remove extra buffer flips (#148)
    
    Not needed buffer flips removed.
---
 CHANGES.txt                                        |   1 +
 cassandra-four-zero-bridge/build.gradle            |   1 +
 .../cassandra/io/util/CdcRandomAccessReader.java   |   2 -
 .../io/util/CdcRandomAccessReaderTest.java         | 448 +++++++++++++++++++++
 4 files changed, 450 insertions(+), 2 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 04956e51..5463af35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Remove not needed buffer flips (CASSANALYTICS-95)
  * Refactor BulkWriterContext broadcasting to use immutable config class 
(CASSANALYTICS-89)
  * Bump sidecar dependency to 0.2.0 (CASSANALYTICS-93)
  * Support for Trie-Indexed SSTables (BTI format) (CASSANALYTICS-27)
diff --git a/cassandra-four-zero-bridge/build.gradle 
b/cassandra-four-zero-bridge/build.gradle
index add277fb..be3f86fd 100644
--- a/cassandra-four-zero-bridge/build.gradle
+++ b/cassandra-four-zero-bridge/build.gradle
@@ -60,6 +60,7 @@ dependencies {
     testImplementation(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
     testImplementation(group: "${sparkGroupId}", name: 
"spark-sql_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
     testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: 
'1.5.0-4')
+    testImplementation(group: 'io.vertx', name: 'vertx-core', version: 
"${vertxVersion}")
 
     testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: 
"${jnaVersion}")
     testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: 
"${jnaVersion}")
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
index ec487fc5..d4c44cdc 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java
@@ -116,7 +116,6 @@ public class CdcRandomAccessReader extends 
RandomAccessReader
                     BlockingStreamConsumer streamConsumer = new 
BlockingStreamConsumer();
                     source.request(offset, end, streamConsumer);
                     streamConsumer.getBytes(buffer);
-                    buffer.flip();
                     return this;
                 }
 
@@ -128,7 +127,6 @@ public class CdcRandomAccessReader extends 
RandomAccessReader
 
                 inputStream.read(buffer);
                 assert buffer.remaining() == 0;
-                buffer.flip();
             }
             catch (IOException e)
             {
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
new file mode 100644
index 00000000..5f3858c9
--- /dev/null
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import io.vertx.core.buffer.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.spark.data.FileType;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+import org.apache.cassandra.spark.utils.streaming.StreamBuffer;
+import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+
+public class CdcRandomAccessReaderTest
+{
+    private TestCommitLog testCommitLog;
+    private TestCassandraFileSource testSource;
+
+    private CdcRandomAccessReader reader;
+
+    @BeforeEach
+    public void setUp()
+    {
+        testSource = new TestCassandraFileSource();
+        testCommitLog = new TestCommitLog("/test/path/commitlog", testSource, 
ICdcStats.STUB, 1024L);
+        testSource.setCommitLog(testCommitLog);
+    }
+
+    @Test
+    public void testConstructorInitialization()
+    {
+        // Verify reader is properly initialized with commit log
+        reader = new CdcRandomAccessReader(testCommitLog);
+
+        assertThat(reader).isNotNull();
+        assertThat(reader.getPath()).isEqualTo("/test/path/commitlog");
+    }
+
+    @Test
+    public void testCDCRebufferConstructorWithInvalidChunkSize()
+    {
+        // Verify constructor rejects zero chunk size
+        assertThatThrownBy(() -> new 
CdcRandomAccessReader.CDCRebuffer(testCommitLog, 0))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Chunk size must be a positive integer");
+
+        // Verify constructor rejects negative chunk size
+        assertThatThrownBy(() -> new 
CdcRandomAccessReader.CDCRebuffer(testCommitLog, -1))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Chunk size must be a positive integer");
+    }
+
+    @Test
+    public void testCDCRebufferSequentialReading() throws IOException
+    {
+        // Setup: 100 bytes total, 50-byte buffer chunks
+        testCommitLog.setMaxOffset(100L);
+        final int bufferSize = 50;
+        testSource.setRequestHandler(call -> {
+            // BufferingInputStream requests a range - we must provide ALL 
requested data
+            long actualEnd = Math.min(call.end, testCommitLog.maxOffset());
+            long position = call.start;
+
+            // Deliver data in chunks until request is fulfilled
+            while (position < actualEnd)
+            {
+                int chunkSize = (int) Math.min(actualEnd - position, 
bufferSize);
+                Buffer data = Buffer.buffer();
+                for (int i = 0; i < chunkSize; i++)
+                {
+                    data.appendByte((byte) (position + i));
+                }
+
+                TestStreamBuffer streamBuffer = new TestStreamBuffer(data);
+                call.consumer.onRead(streamBuffer);
+                position += chunkSize;
+            }
+
+            // Signal end of stream when reaching EOF
+            if (actualEnd >= testCommitLog.maxOffset())
+            {
+                call.consumer.onEnd();
+            }
+        });
+        CdcRandomAccessReader.CDCRebuffer rebuffer = new 
CdcRandomAccessReader.CDCRebuffer(testCommitLog, bufferSize);
+
+        // First read: bytes 0-49
+        Rebufferer.BufferHolder holder = rebuffer.rebuffer(0);
+        assertThat(holder).isNotNull();
+        assertThat(rebuffer.offset()).isEqualTo(0L);
+
+        // Verify buffer was fully written (position at end, remaining = 0)
+        ByteBuffer buffer = holder.buffer();
+        assertThat(buffer.remaining()).isEqualTo(0);
+        assertThat(buffer.position()).isEqualTo(50);
+
+        // Flip to read mode and verify actual byte values
+        buffer.flip();
+        for (int i = 0; i < 50; i++)
+        {
+            assertThat(buffer.get()).isEqualTo((byte) i);
+        }
+
+        // Second read: bytes 50-99 (sequential)
+        holder = rebuffer.rebuffer(50);
+        assertThat(holder).isNotNull();
+        assertThat(rebuffer.offset()).isEqualTo(50L);
+
+        // Verify buffer was fully written (position at end, remaining = 0)
+        buffer = holder.buffer();
+        assertThat(buffer.remaining()).isEqualTo(0);
+        assertThat(buffer.position()).isEqualTo(50);
+
+        // Flip to read mode and verify actual byte values
+        buffer.flip();
+        for (int i = 0; i < 50; i++)
+        {
+            assertThat(buffer.get()).isEqualTo((byte) (50 + i));
+        }
+    }
+
+    @Test
+    public void testCDCRebufferBackwardSeek() throws IOException
+    {
+        // Setup: 100 bytes total, 50-byte buffer chunks
+        testCommitLog.setMaxOffset(100L);
+        final int bufferSize = 50;
+        testSource.setRequestHandler(call -> {
+            // Backward seek path has a bug: requests buffer.remaining() + 1 
bytes
+            // We cap delivery at buffer capacity to work around this and test 
flip() behavior
+            long actualEnd = Math.min(call.end, testCommitLog.maxOffset());
+            long requestedBytes = actualEnd - call.start;
+            long position = call.start;
+
+            // Cap delivery to buffer capacity to avoid 
BufferOverflowException from + 1 bug
+            long bytesToDeliver = Math.min(requestedBytes, bufferSize);
+
+            // Deliver capped amount
+            while (position < call.start + bytesToDeliver)
+            {
+                int chunkSize = (int) Math.min(call.start + bytesToDeliver - 
position, bufferSize);
+                Buffer data = Buffer.buffer();
+                for (int i = 0; i < chunkSize; i++)
+                {
+                    data.appendByte((byte) (position + i));
+                }
+
+                TestStreamBuffer streamBuffer = new TestStreamBuffer(data);
+                call.consumer.onRead(streamBuffer);
+                position += chunkSize;
+            }
+
+            // Signal end when complete
+            call.consumer.onEnd();
+        });
+        CdcRandomAccessReader.CDCRebuffer rebuffer = new 
CdcRandomAccessReader.CDCRebuffer(testCommitLog, bufferSize);
+
+        // First, advance to position 50 (sequential read)
+        Rebufferer.BufferHolder holder = rebuffer.rebuffer(50);
+        assertThat(holder).isNotNull();
+        assertThat(rebuffer.offset()).isEqualTo(50L);
+
+        // Now seek backward to position 0 - triggers backward seek code path
+        holder = rebuffer.rebuffer(0);
+        assertThat(holder).isNotNull();
+        assertThat(rebuffer.offset()).isEqualTo(0L);
+
+        // Verify buffer is in write mode
+        ByteBuffer buffer = holder.buffer();
+        assertThat(buffer.remaining()).isEqualTo(0);
+        assertThat(buffer.position()).isEqualTo(50);
+
+        // TEST flips to verify byte values are correct
+        buffer.flip();
+        for (int i = 0; i < 50; i++)
+        {
+            assertThat(buffer.get()).isEqualTo((byte) i);
+        }
+    }
+
+
+    @Test
+    public void testCdcRandomAccessReaderEndToEnd()
+    {
+        // Setup commit log with 1024 bytes
+        testCommitLog.setMaxOffset(1024L);
+
+        // Configure source to provide sequential data
+        testSource.setRequestHandler(call -> {
+            int dataSize = (int) (call.end - call.start);
+            Buffer data = Buffer.buffer(dataSize);
+            for (int i = 0; i < dataSize; i++)
+            {
+                data.appendByte((byte) (call.start + i));
+            }
+
+            TestStreamBuffer streamBuffer = new TestStreamBuffer(data);
+
+            // Deliver data and signal completion
+            call.consumer.onRead(streamBuffer);
+            call.consumer.onEnd();
+        });
+
+        // Create reader
+        reader = new CdcRandomAccessReader(testCommitLog);
+
+        assertThat(reader).isNotNull();
+        assertThat(reader.getPath()).isEqualTo("/test/path/commitlog");
+
+        // Verify no premature calls before rebuffer is used
+        assertThat(testSource.requestCalls.size()).isEqualTo(0);
+    }
+
+    // Test stub classes
+
+    private static class TestCommitLog implements CommitLog
+    {
+        private final String path;
+        private final CassandraFileSource<CommitLog> source;
+        private final ICdcStats stats;
+        private long maxOffset;
+        private boolean closed = false;
+        private boolean completed = false;
+        private Throwable closeException = null;
+
+        TestCommitLog(String path, CassandraFileSource<CommitLog> source, 
ICdcStats stats, long maxOffset)
+        {
+            this.path = path;
+            this.source = source;
+            this.stats = stats;
+            this.maxOffset = maxOffset;
+        }
+
+        @Override
+        public String path()
+        {
+            return path;
+        }
+
+        @Override
+        public CassandraFileSource<CommitLog> source()
+        {
+            return source;
+        }
+
+        @Override
+        public ICdcStats stats()
+        {
+            return stats;
+        }
+
+        @Override
+        public long maxOffset()
+        {
+            return maxOffset;
+        }
+
+        @Override
+        public long length()
+        {
+            return maxOffset;
+        }
+
+        @Override
+        public String name()
+        {
+            return path;
+        }
+
+        @Override
+        public boolean completed()
+        {
+            return completed;
+        }
+
+        @Override
+        public CassandraInstance instance()
+        {
+            // Return a mock instance - not needed for our tests
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+            if (closeException != null)
+            {
+                if (closeException instanceof IOException)
+                {
+                    throw (IOException) closeException;
+                }
+                throw new IOException("Close failed", closeException);
+            }
+            closed = true;
+        }
+
+        void setMaxOffset(long maxOffset)
+        {
+            this.maxOffset = maxOffset;
+        }
+
+        void setCloseException(Throwable exception)
+        {
+            this.closeException = exception;
+        }
+
+        void setCompleted(boolean completed)
+        {
+            this.completed = completed;
+        }
+
+        boolean isClosed()
+        {
+            return closed;
+        }
+    }
+
+    private static class TestCassandraFileSource implements 
CassandraFileSource<CommitLog>
+    {
+        final List<RequestCall> requestCalls = new ArrayList<>();
+        Consumer<RequestCall> requestHandler = null;
+        private TestCommitLog commitLog;
+
+        @Override
+        public void request(long start, long end, StreamConsumer consumer)
+        {
+            RequestCall call = new RequestCall(start, end, consumer);
+            requestCalls.add(call);
+            if (requestHandler != null)
+            {
+                requestHandler.accept(call);
+            }
+        }
+
+        @Override
+        public CommitLog cassandraFile()
+        {
+            return commitLog;
+        }
+
+        @Override
+        public FileType fileType()
+        {
+            return FileType.COMMITLOG;
+        }
+
+        @Override
+        public long size()
+        {
+            return commitLog != null ? commitLog.length() : 0L;
+        }
+
+        void setCommitLog(TestCommitLog commitLog)
+        {
+            this.commitLog = commitLog;
+        }
+
+        void setRequestHandler(Consumer<RequestCall> handler)
+        {
+            this.requestHandler = handler;
+        }
+
+        static class RequestCall
+        {
+            final long start;
+            final long end;
+            final StreamConsumer consumer;
+
+            RequestCall(long start, long end, StreamConsumer consumer)
+            {
+                this.start = start;
+                this.end = end;
+                this.consumer = consumer;
+            }
+        }
+    }
+
+    private static class TestStreamBuffer implements StreamBuffer
+    {
+        private final Buffer buffer;
+
+        TestStreamBuffer(Buffer buffer)
+        {
+            this.buffer = buffer;
+        }
+
+        @Override
+        public int readableBytes()
+        {
+            return buffer.length();
+        }
+
+        @Override
+        public void getBytes(int sourceOffset, ByteBuffer destination, int 
length)
+        {
+            destination.put(buffer.getBytes(sourceOffset, sourceOffset + 
length));
+//            destination.flip();
+        }
+
+        @Override
+        public void getBytes(int sourceOffset, byte[] destination, int 
destinationIndex, int length)
+        {
+            buffer.getBytes(sourceOffset, sourceOffset + length, destination, 
destinationIndex);
+        }
+
+        @Override
+        public byte getByte(int index)
+        {
+            return buffer.getByte(index);
+        }
+
+        @Override
+        public void release()
+        {
+            // No-op for test implementation
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to