This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 4f25e58f26 zero copy streaming allocates direct memory that isn't 
used, but does help to fragment the memory space
4f25e58f26 is described below

commit 4f25e58f267e68df945e0ba47b2a684c1fb33a85
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri May 2 14:23:17 2025 -0700

    zero copy streaming allocates direct memory that isn't used, but does help 
to fragment the memory space
    
    patch by David Capwell; reviewed by Yifan Cai for CASSANDRA-20577
---
 CHANGES.txt                                        |  1 +
 .../io/sstable/SSTableZeroCopyWriter.java          | 47 +++++++++++++++++-----
 .../apache/cassandra/io/util/SequentialWriter.java |  9 ++++-
 3 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 08e0ecfa16..d30a383b7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.5
+ * zero copy streaming allocates direct memory that isn't used, but does help 
to fragment the memory space (CASSANDRA-20577)
  * CQLSSTableWriter supports setting the format (BTI or Big) (CASSANDRA-20609)
  * Don't allocate in ThreadLocalReadAheadBuffer#close() (CASSANDRA-20551)
  * Ensure RowFilter#isMutableIntersection() properly evaluates numeric ranges 
on a single column (CASSANDRA-20566)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
index 3bf21f1155..46a490974e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
@@ -38,9 +38,12 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
 import org.apache.cassandra.net.AsyncStreamingInputPlus;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static java.lang.String.format;
 import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
@@ -50,7 +53,7 @@ public class SSTableZeroCopyWriter extends SSTable implements 
SSTableMultiWriter
     private static final Logger logger = 
LoggerFactory.getLogger(SSTableZeroCopyWriter.class);
 
     private volatile SSTableReader finalReader;
-    private final Map<String, SequentialWriter> componentWriters; // indexed 
by component name
+    private final Map<String, ZeroCopySequentialWriter> componentWriters; // 
indexed by component name
 
     public SSTableZeroCopyWriter(Builder<?, ?> builder,
                                  LifecycleNewTracker lifecycleNewTracker,
@@ -89,12 +92,12 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
         throw new UnsupportedOperationException();
     }
 
-    private SequentialWriter makeWriter(Descriptor descriptor, Component 
component)
+    private ZeroCopySequentialWriter makeWriter(Descriptor descriptor, 
Component component)
     {
-        return new SequentialWriter(descriptor.fileFor(component), 
ioOptions.writerOptions, false);
+        return new ZeroCopySequentialWriter(descriptor.fileFor(component), 
ioOptions.writerOptions, false);
     }
 
-    private void write(DataInputPlus in, long size, SequentialWriter out) 
throws FSWriteError
+    private void write(DataInputPlus in, long size, ZeroCopySequentialWriter 
out) throws FSWriteError
     {
         final int BUFFER_SIZE = 1 << 20;
         long bytesRead = 0;
@@ -128,7 +131,7 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
     {
         setOpenResult(openResult);
 
-        for (SequentialWriter writer : componentWriters.values())
+        for (ZeroCopySequentialWriter writer : componentWriters.values())
             writer.finish();
 
         return finished();
@@ -170,7 +173,7 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
     @Override
     public Throwable commit(Throwable accumulate)
     {
-        for (SequentialWriter writer : componentWriters.values())
+        for (ZeroCopySequentialWriter writer : componentWriters.values())
             accumulate = writer.commit(accumulate);
         return accumulate;
     }
@@ -178,7 +181,7 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
     @Override
     public Throwable abort(Throwable accumulate)
     {
-        for (SequentialWriter writer : componentWriters.values())
+        for (ZeroCopySequentialWriter writer : componentWriters.values())
             accumulate = writer.abort(accumulate);
         return accumulate;
     }
@@ -186,29 +189,30 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
     @Override
     public void prepareToCommit()
     {
-        for (SequentialWriter writer : componentWriters.values())
+        for (ZeroCopySequentialWriter writer : componentWriters.values())
             writer.prepareToCommit();
     }
 
     @Override
     public void close()
     {
-        for (SequentialWriter writer : componentWriters.values())
+        for (ZeroCopySequentialWriter writer : componentWriters.values())
             writer.close();
     }
 
     public void writeComponent(Component component, DataInputPlus in, long 
size) throws ClosedChannelException
     {
-        SequentialWriter writer = componentWriters.get(component.name);
+        ZeroCopySequentialWriter writer = componentWriters.get(component.name);
         logger.info("Writing component {} to {} length {}", component, 
writer.getPath(), prettyPrintMemory(size));
 
         if (in instanceof AsyncStreamingInputPlus)
             write((AsyncStreamingInputPlus) in, size, writer);
         else
+            // this code path is not valid for production and only exists to 
simplify unit tests
             write(in, size, writer);
     }
 
-    private void write(AsyncStreamingInputPlus in, long size, SequentialWriter 
writer) throws ClosedChannelException
+    private void write(AsyncStreamingInputPlus in, long size, 
ZeroCopySequentialWriter writer) throws ClosedChannelException
     {
         logger.info("Block Writing component to {} length {}", 
writer.getPath(), prettyPrintMemory(size));
 
@@ -233,4 +237,25 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
             throw new FSWriteError(e, writer.getPath());
         }
     }
+
+    private static class ZeroCopySequentialWriter extends SequentialWriter
+    {
+        private ZeroCopySequentialWriter(File file, SequentialWriterOption 
option, boolean strictFlushing)
+        {
+            super(file, ByteBufferUtil.EMPTY_BYTE_BUFFER, option, 
strictFlushing);
+        }
+
+        /**
+         * In production, we do not expect this method to be called, as only 
writeDirectlyToChannel should be invoked for zero-copy.
+         * <p>
+         * This method only exists for tests.
+         */
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException
+        {
+            if (this.buffer == ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                this.buffer = option.allocateBuffer();
+            super.write(b, off, len);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 69643be987..c3a90732ee 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -56,7 +56,7 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
 
     // whether to do trickling fsync() to avoid sudden bursts of dirty buffer 
flushing by kernel causing read
     // latency spikes
-    private final SequentialWriterOption option;
+    protected final SequentialWriterOption option;
     private int bytesSinceTrickleFsync = 0;
 
     protected long lastFlushOffset;
@@ -163,7 +163,12 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
      */
     public SequentialWriter(File file, SequentialWriterOption option, boolean 
strictFlushing)
     {
-        super(openChannel(file), option.allocateBuffer());
+        this(file, option.allocateBuffer(), option, strictFlushing);
+    }
+
+    protected SequentialWriter(File file, ByteBuffer buffer, 
SequentialWriterOption option, boolean strictFlushing)
+    {
+        super(openChannel(file), buffer);
         this.strictFlushing = strictFlushing;
         this.fchannel = (FileChannel)channel;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to