This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 12c5495d2f70e0775c6d8d1ffbd00123cc8923ed
Author: Szymon Miężał <szymon.mie...@datastax.com>
AuthorDate: Thu Apr 24 15:06:55 2025 +0200

    Reading mmapped trie-index exceeding 2GiB results in exception
    
    Memory-mapping is done in buffers of size less than 2GiB.
    When these buffers aren't aligned to 4KiB and the trie-index file
    spans many buffers then reading it results in going out of buffer
    bounds.
    
    This patch fixes it by making sure that the buffers are correctly
    aligned.
    
    patch by Szymon Miezal; reviewed by blambov and brandonwilliams for 
CASSANDRA-20351
---
 .../org/apache/cassandra/io/util/FileHandle.java   |  6 +-
 .../apache/cassandra/io/util/MmappedRegions.java   | 53 +++++++-------
 .../cassandra/io/util/MmappedRegionsCache.java     | 10 +--
 .../cassandra/io/util/MmappedRegionsTest.java      | 81 ++++++++++++++++------
 4 files changed, 93 insertions(+), 57 deletions(-)

diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java 
b/src/java/org/apache/cassandra/io/util/FileHandle.java
index 67bfd239d6..7e4b214128 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -406,14 +406,14 @@ public class FileHandle extends SharedCloseableImpl
                 {
                     if (compressionMetadata != null)
                     {
-                        regions = mmappedRegionsCache != null ? 
mmappedRegionsCache.getOrCreate(channel, compressionMetadata)
+                        regions = mmappedRegionsCache != null ? 
mmappedRegionsCache.getOrCreate(channel, compressionMetadata, bufferSize)
                                                               : 
MmappedRegions.map(channel, compressionMetadata);
                         rebuffererFactory = maybeCached(new 
CompressedChunkReader.Mmap(channel, compressionMetadata, regions, 
crcCheckChanceSupplier));
                     }
                     else
                     {
-                        regions = mmappedRegionsCache != null ? 
mmappedRegionsCache.getOrCreate(channel, length)
-                                                              : 
MmappedRegions.map(channel, length);
+                        regions = mmappedRegionsCache != null ? 
mmappedRegionsCache.getOrCreate(channel, length, bufferSize)
+                                                              : 
MmappedRegions.map(channel, length, bufferSize);
                         rebuffererFactory = new MmapRebufferer(channel, 
length, regions);
                     }
                 }
diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java 
b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
index 0ab07b8d0f..578217e279 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
@@ -65,27 +65,22 @@ public class MmappedRegions extends SharedCloseableImpl
      */
     private volatile State copy;
 
-    private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, 
long length)
-    {
-        this(new State(channel), metadata, length);
-    }
-
-    private MmappedRegions(State state, CompressionMetadata metadata, long 
length)
+    private MmappedRegions(State state, long length, int chunkSize)
     {
         super(new Tidier(state));
-
         this.state = state;
-
-        if (metadata != null)
-        {
-            assert length == 0 : "expected no length with metadata";
-            updateState(metadata);
-        }
-        else if (length > 0)
+        if (length > 0)
         {
-            updateState(length);
+            updateState(length, chunkSize);
         }
+        this.copy = new State(state);
+    }
 
+    private MmappedRegions(State state, CompressionMetadata metadata)
+    {
+        super(new Tidier(state));
+        this.state = state;
+        updateState(metadata);
         this.copy = new State(state);
     }
 
@@ -97,7 +92,7 @@ public class MmappedRegions extends SharedCloseableImpl
 
     public static MmappedRegions empty(ChannelProxy channel)
     {
-        return new MmappedRegions(channel, null, 0);
+        return new MmappedRegions(new State(channel), 0, 0);
     }
 
     /**
@@ -109,16 +104,16 @@ public class MmappedRegions extends SharedCloseableImpl
     {
         if (metadata == null)
             throw new IllegalArgumentException("metadata cannot be null");
-
-        return new MmappedRegions(channel, metadata, 0);
+        State state = new State(channel);
+        return new MmappedRegions(state, metadata);
     }
 
-    public static MmappedRegions map(ChannelProxy channel, long length)
+    public static MmappedRegions map(ChannelProxy channel, long length, int 
chunkSize)
     {
         if (length <= 0)
             throw new IllegalArgumentException("Length must be positive");
-
-        return new MmappedRegions(channel, null, length);
+        State state = new State(channel);
+        return new MmappedRegions(state, length, chunkSize);
     }
 
     /**
@@ -140,8 +135,10 @@ public class MmappedRegions extends SharedCloseableImpl
      *
      * @return {@code true} if new regions have been created
      */
-    public boolean extend(long length)
+    public boolean extend(long length, int chunkSize)
     {
+        // We cannot enforce length to be a multiple of chunkSize (at the very 
least the last extend on a file
+        // will not satisfy this), so we hope the caller knows what they are 
doing.
         if (length < 0)
             throw new IllegalArgumentException("Length must not be negative");
 
@@ -151,7 +148,7 @@ public class MmappedRegions extends SharedCloseableImpl
             return false;
 
         int initialRegions = state.last;
-        updateState(length);
+        updateState(length, chunkSize);
         copy = new State(state);
         return state.last > initialRegions;
     }
@@ -162,7 +159,7 @@ public class MmappedRegions extends SharedCloseableImpl
      *
      * @return {@code true} if new regions have been created
      */
-    public boolean extend(CompressionMetadata compressionMetadata)
+    public boolean extend(CompressionMetadata compressionMetadata, int 
chunkSize)
     {
         assert !isCopy() : "Copies cannot be extended";
 
@@ -171,7 +168,7 @@ public class MmappedRegions extends SharedCloseableImpl
 
         int initialRegions = state.last;
         if (compressionMetadata.compressedFileLength - state.length <= 
MAX_SEGMENT_SIZE)
-            updateState(compressionMetadata.compressedFileLength);
+            updateState(compressionMetadata.compressedFileLength, chunkSize);
         else
             updateState(compressionMetadata);
 
@@ -183,13 +180,15 @@ public class MmappedRegions extends SharedCloseableImpl
      * Updates state by adding the remaining segments. It starts with the 
current state last segment end position and
      * subsequently add new segments until all data up to the provided length 
are mapped.
      */
-    private void updateState(long length)
+    private void updateState(long length, int chunkSize)
     {
+        // make sure the regions span whole chunks
+        long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize;
         state.length = length;
         long pos = state.getPosition();
         while (pos < length)
         {
-            long size = Math.min(MAX_SEGMENT_SIZE, length - pos);
+            long size = Math.min(maxSize, length - pos);
             state.add(pos, size);
             pos += size;
         }
diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java 
b/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java
index dff9561f4f..e3ebc34609 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java
@@ -45,12 +45,12 @@ public class MmappedRegionsCache implements AutoCloseable
      * @param length  length of the file
      * @return a shared copy of the cached mmapped regions
      */
-    public MmappedRegions getOrCreate(ChannelProxy channel, long length)
+    public MmappedRegions getOrCreate(ChannelProxy channel, long length, int 
bufferSize)
     {
         Preconditions.checkState(!closed);
-        MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored 
-> MmappedRegions.map(channel, length));
+        MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored 
-> MmappedRegions.map(channel, length, bufferSize));
         Preconditions.checkArgument(regions.isValid(channel));
-        regions.extend(length);
+        regions.extend(length, bufferSize);
         return regions.sharedCopy();
     }
 
@@ -62,12 +62,12 @@ public class MmappedRegionsCache implements AutoCloseable
      * @param metadata compression metadata of the file
      * @return a shared copy of the cached mmapped regions
      */
-    public MmappedRegions getOrCreate(ChannelProxy channel, 
CompressionMetadata metadata)
+    public MmappedRegions getOrCreate(ChannelProxy channel, 
CompressionMetadata metadata, int bufferSize)
     {
         Preconditions.checkState(!closed);
         MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored 
-> MmappedRegions.map(channel, metadata));
         Preconditions.checkArgument(regions.isValid(channel));
-        regions.extend(metadata);
+        regions.extend(metadata, bufferSize);
         return regions.sharedCopy();
     }
 
diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java 
b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
index e6b5dd0c09..af4c6f042d 100644
--- a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
+++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
@@ -114,10 +114,11 @@ public class MmappedRegionsTest
     public void testTwoSegments() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(2048);
+        int bufSize = 1024;
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testTwoSegments", buffer));
              MmappedRegions regions = MmappedRegions.empty(channel))
         {
-            regions.extend(1024);
+            regions.extend(1024, bufSize);
             for (int i = 0; i < 1024; i++)
             {
                 MmappedRegions.Region region = regions.floor(i);
@@ -126,7 +127,7 @@ public class MmappedRegionsTest
                 assertEquals(1024, region.end());
             }
 
-            regions.extend(2048);
+            regions.extend(2048, bufSize);
             for (int i = 0; i < 2048; i++)
             {
                 MmappedRegions.Region region = regions.floor(i);
@@ -149,14 +150,15 @@ public class MmappedRegionsTest
     public void testSmallSegmentSize() throws Exception
     {
         MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+        int bufSize = 1024;
 
         ByteBuffer buffer = allocateBuffer(4096);
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testSmallSegmentSize", buffer));
              MmappedRegions regions = MmappedRegions.empty(channel))
         {
-            regions.extend(1024);
-            regions.extend(2048);
-            regions.extend(4096);
+            regions.extend(1024, bufSize);
+            regions.extend(2048, bufSize);
+            regions.extend(4096, bufSize);
 
             final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
             for (int i = 0; i < buffer.capacity(); i++)
@@ -169,17 +171,45 @@ public class MmappedRegionsTest
         }
     }
 
+    @Test
+    public void testSizeIsChunkMultiple() throws Exception
+    {
+        final int oldMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
+        final int bufSize = 1024;
+        MmappedRegions.MAX_SEGMENT_SIZE = 2047;
+        ByteBuffer buffer = allocateBuffer(4096);
+        try(ChannelProxy channel = new 
ChannelProxy(writeFile("testSmallSegmentSize", buffer));
+            MmappedRegions regions = MmappedRegions.empty(channel))
+        {
+            regions.extend(1024, bufSize);
+            regions.extend(2048, bufSize);
+            regions.extend(4096, bufSize);
+            for (int i = 0; i < buffer.capacity(); i++)
+            {
+                MmappedRegions.Region region = regions.floor(i);
+                assertNotNull(region);
+                assertEquals(bufSize * (i / bufSize), region.offset());
+                assertEquals(bufSize + (bufSize * (i / bufSize)), 
region.end());
+            }
+        }
+        finally
+        {
+            MmappedRegions.MAX_SEGMENT_SIZE = oldMaxSegmentSize;
+        }
+    }
+
     @Test
     public void testAllocRegions() throws Exception
     {
         MmappedRegions.MAX_SEGMENT_SIZE = 1024;
 
         ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * 
MmappedRegions.REGION_ALLOC_SIZE * 3);
+        int bufSize = 1024;
 
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testAllocRegions", buffer));
              MmappedRegions regions = MmappedRegions.empty(channel))
         {
-            regions.extend(buffer.capacity());
+            regions.extend(buffer.capacity(), bufSize);
 
             final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
             for (int i = 0; i < buffer.capacity(); i++)
@@ -196,17 +226,18 @@ public class MmappedRegionsTest
     public void testCopy() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(128 * 1024);
+        int bufSize = 4096;
 
         MmappedRegions snapshot;
         ChannelProxy channelCopy;
 
         try (ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", 
buffer));
-             MmappedRegions regions = MmappedRegions.map(channel, 
buffer.capacity() / 4))
+             MmappedRegions regions = MmappedRegions.map(channel, 
buffer.capacity() / 4, bufSize))
         {
             // create 3 more segments, one per quater capacity
-            regions.extend(buffer.capacity() / 2);
-            regions.extend(3 * buffer.capacity() / 4);
-            regions.extend(buffer.capacity());
+            regions.extend(buffer.capacity() / 2, bufSize);
+            regions.extend(3 * buffer.capacity() / 4, bufSize);
+            regions.extend(buffer.capacity(), bufSize);
 
             // make a snapshot
             snapshot = regions.sharedCopy();
@@ -238,6 +269,7 @@ public class MmappedRegionsTest
     public void testCopyCannotExtend() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(128 * 1024);
+        int bufSize = 1024;
 
         MmappedRegions snapshot;
         ChannelProxy channelCopy;
@@ -245,7 +277,7 @@ public class MmappedRegionsTest
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testSnapshotCannotExtend", buffer));
              MmappedRegions regions = MmappedRegions.empty(channel))
         {
-            regions.extend(buffer.capacity() / 2);
+            regions.extend(buffer.capacity() / 2, bufSize);
 
             // make a snapshot
             snapshot = regions.sharedCopy();
@@ -256,7 +288,7 @@ public class MmappedRegionsTest
 
         try
         {
-            snapshot.extend(buffer.capacity());
+            snapshot.extend(buffer.capacity(), bufSize);
         }
         finally
         {
@@ -269,12 +301,13 @@ public class MmappedRegionsTest
     public void testExtendOutOfOrder() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(4096);
+        int bufSize = 1024;
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testExtendOutOfOrder", buffer));
              MmappedRegions regions = MmappedRegions.empty(channel))
         {
-            regions.extend(4096);
-            regions.extend(1024);
-            regions.extend(2048);
+            regions.extend(4096, bufSize);
+            regions.extend(1024, bufSize);
+            regions.extend(2048, bufSize);
 
             for (int i = 0; i < buffer.capacity(); i++)
             {
@@ -290,10 +323,11 @@ public class MmappedRegionsTest
     public void testNegativeExtend() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(1024);
+        int bufSize = 1024;
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testNegativeExtend", buffer));
              MmappedRegions regions = MmappedRegions.empty(channel))
         {
-            regions.extend(-1);
+            regions.extend(-1, bufSize);
         }
     }
 
@@ -341,8 +375,9 @@ public class MmappedRegionsTest
     public void testIllegalArgForMap1() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(1024);
+        int bufSize = 1024;
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testIllegalArgForMap1", buffer));
-             MmappedRegions regions = MmappedRegions.map(channel, 0))
+             MmappedRegions regions = MmappedRegions.map(channel, 0, bufSize))
         {
             assertTrue(regions.isEmpty());
         }
@@ -352,8 +387,9 @@ public class MmappedRegionsTest
     public void testIllegalArgForMap2() throws Exception
     {
         ByteBuffer buffer = allocateBuffer(1024);
+        int bufSize = 1024;
         try (ChannelProxy channel = new 
ChannelProxy(writeFile("testIllegalArgForMap2", buffer));
-             MmappedRegions regions = MmappedRegions.map(channel, -1L))
+             MmappedRegions regions = MmappedRegions.map(channel, -1L, 
bufSize))
         {
             assertTrue(regions.isEmpty());
         }
@@ -382,6 +418,7 @@ public class MmappedRegionsTest
     {
         MmappedRegions.MAX_SEGMENT_SIZE = maxSegmentSize << 10;
         int size = Arrays.stream(writeSizes).sum() << 10;
+        int bufSize = 4096;
 
         ByteBuffer buffer = allocateBuffer(size);
         File f = FileUtils.createTempFile("testMapForCompressionMetadata", 
"1");
@@ -423,10 +460,10 @@ public class MmappedRegionsTest
                     writer.sync();
 
                     // verify that calling extend for the same (first 
iteration) or some previous metadata (further iterations) has no effect
-                    assertFalse(regions.extend(metadata));
+                    assertFalse(regions.extend(metadata, bufSize));
 
                     logger.info("Checking extend on compressed chunk for 
range={} {}..{} / {}", idx, pos, pos + (writeSizes[idx] << 10), size);
-                    checkExtendOnCompressedChunks(f, writer, regions);
+                    checkExtendOnCompressedChunks(f, writer, regions, bufSize);
                     pos += writeSizes[idx] << 10;
                     idx++;
                 }
@@ -434,12 +471,12 @@ public class MmappedRegionsTest
         }
     }
 
-    private void checkExtendOnCompressedChunks(File f, 
CompressedSequentialWriter writer, MmappedRegions regions)
+    private void checkExtendOnCompressedChunks(File f, 
CompressedSequentialWriter writer, MmappedRegions regions, int bufSize)
     {
         int dataOffset;
         try (CompressionMetadata metadata = 
writer.open(writer.getLastFlushOffset()))
         {
-            regions.extend(metadata);
+            regions.extend(metadata, bufSize);
             assertFalse(regions.isEmpty());
             dataOffset = 0;
             while (dataOffset < metadata.dataLength)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to