This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 4f25e58f26 zero copy streaming allocates direct memory that isn't used, but does help to fragment the memory space 4f25e58f26 is described below commit 4f25e58f267e68df945e0ba47b2a684c1fb33a85 Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri May 2 14:23:17 2025 -0700 zero copy streaming allocates direct memory that isn't used, but does help to fragment the memory space patch by David Capwell; reviewed by Yifan Cai for CASSANDRA-20577 --- CHANGES.txt | 1 + .../io/sstable/SSTableZeroCopyWriter.java | 47 +++++++++++++++++----- .../apache/cassandra/io/util/SequentialWriter.java | 9 ++++- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 08e0ecfa16..d30a383b7f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0.5 + * zero copy streaming allocates direct memory that isn't used, but does help to fragment the memory space (CASSANDRA-20577) * CQLSSTableWriter supports setting the format (BTI or Big) (CASSANDRA-20609) * Don't allocate in ThreadLocalReadAheadBuffer#close() (CASSANDRA-20551) * Ensure RowFilter#isMutableIntersection() properly evaluates numeric ranges on a single column (CASSANDRA-20566) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java index 3bf21f1155..46a490974e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java @@ -38,9 +38,12 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.net.AsyncStreamingInputPlus; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.ByteBufferUtil; import static java.lang.String.format; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; @@ -50,7 +53,7 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter private static final Logger logger = LoggerFactory.getLogger(SSTableZeroCopyWriter.class); private volatile SSTableReader finalReader; - private final Map<String, SequentialWriter> componentWriters; // indexed by component name + private final Map<String, ZeroCopySequentialWriter> componentWriters; // indexed by component name public SSTableZeroCopyWriter(Builder<?, ?> builder, LifecycleNewTracker lifecycleNewTracker, @@ -89,12 +92,12 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter throw new UnsupportedOperationException(); } - private SequentialWriter makeWriter(Descriptor descriptor, Component component) + private ZeroCopySequentialWriter makeWriter(Descriptor descriptor, Component component) { - return new SequentialWriter(descriptor.fileFor(component), ioOptions.writerOptions, false); + return new ZeroCopySequentialWriter(descriptor.fileFor(component), ioOptions.writerOptions, false); } - private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError + private void write(DataInputPlus in, long size, ZeroCopySequentialWriter out) throws FSWriteError { final int BUFFER_SIZE = 1 << 20; long bytesRead = 0; @@ -128,7 +131,7 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter { setOpenResult(openResult); - for (SequentialWriter writer : componentWriters.values()) + for (ZeroCopySequentialWriter writer : componentWriters.values()) writer.finish(); return finished(); @@ -170,7 +173,7 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter @Override public Throwable commit(Throwable accumulate) { - for (SequentialWriter writer : componentWriters.values()) + for (ZeroCopySequentialWriter writer : componentWriters.values()) accumulate = writer.commit(accumulate); return accumulate; } @@ -178,7 +181,7 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter @Override public Throwable abort(Throwable accumulate) { - for (SequentialWriter writer : componentWriters.values()) + for (ZeroCopySequentialWriter writer : componentWriters.values()) accumulate = writer.abort(accumulate); return accumulate; } @@ -186,29 +189,30 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter @Override public void prepareToCommit() { - for (SequentialWriter writer : componentWriters.values()) + for (ZeroCopySequentialWriter writer : componentWriters.values()) writer.prepareToCommit(); } @Override public void close() { - for (SequentialWriter writer : componentWriters.values()) + for (ZeroCopySequentialWriter writer : componentWriters.values()) writer.close(); } public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException { - SequentialWriter writer = componentWriters.get(component.name); + ZeroCopySequentialWriter writer = componentWriters.get(component.name); logger.info("Writing component {} to {} length {}", component, writer.getPath(), prettyPrintMemory(size)); if (in instanceof AsyncStreamingInputPlus) write((AsyncStreamingInputPlus) in, size, writer); else + // this code path is not valid for production and only exists to simplify unit tests write(in, size, writer); } - private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer) throws ClosedChannelException + private void write(AsyncStreamingInputPlus in, long size, ZeroCopySequentialWriter writer) throws ClosedChannelException { logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size)); @@ -233,4 +237,25 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter throw new FSWriteError(e, writer.getPath()); } } + + private static class ZeroCopySequentialWriter extends SequentialWriter + { + private ZeroCopySequentialWriter(File file, SequentialWriterOption option, boolean strictFlushing) + { + super(file, ByteBufferUtil.EMPTY_BYTE_BUFFER, option, strictFlushing); + } + + /** + * In production, we do not expect this method to be called, as only writeDirectlyToChannel should be invoked for zero-copy. + * <p> + * This method only exists for tests. + */ + @Override + public void write(byte[] b, int off, int len) throws IOException + { + if (this.buffer == ByteBufferUtil.EMPTY_BYTE_BUFFER) + this.buffer = option.allocateBuffer(); + super.write(b, off, len); + } + } } diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 69643be987..c3a90732ee 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -56,7 +56,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr // whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read // latency spikes - private final SequentialWriterOption option; + protected final SequentialWriterOption option; private int bytesSinceTrickleFsync = 0; protected long lastFlushOffset; @@ -163,7 +163,12 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr */ public SequentialWriter(File file, SequentialWriterOption option, boolean strictFlushing) { - super(openChannel(file), option.allocateBuffer()); + this(file, option.allocateBuffer(), option, strictFlushing); + } + + protected SequentialWriter(File file, ByteBuffer buffer, SequentialWriterOption option, boolean strictFlushing) + { + super(openChannel(file), buffer); this.strictFlushing = strictFlushing; this.fchannel = (FileChannel)channel; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org