This is an automated email from the ASF dual-hosted git repository. brandonwilliams pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 12c5495d2f70e0775c6d8d1ffbd00123cc8923ed Author: Szymon Miężał <szymon.mie...@datastax.com> AuthorDate: Thu Apr 24 15:06:55 2025 +0200 Reading mmapped trie-index exceeding 2GiB results in exception Memory-mapping is done in buffers of size less than 2GiB. When these buffers aren't aligned to 4KiB and the trie-index file spans many buffers then reading it results in going out of buffer bounds. This patch fixes it by making sure that the buffers are correctly aligned. patch by Szymon Miezal; reviewed by blambov and brandonwilliams for CASSANDRA-20351 --- .../org/apache/cassandra/io/util/FileHandle.java | 6 +- .../apache/cassandra/io/util/MmappedRegions.java | 53 +++++++------- .../cassandra/io/util/MmappedRegionsCache.java | 10 +-- .../cassandra/io/util/MmappedRegionsTest.java | 81 ++++++++++++++++------ 4 files changed, 93 insertions(+), 57 deletions(-) diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java index 67bfd239d6..7e4b214128 100644 --- a/src/java/org/apache/cassandra/io/util/FileHandle.java +++ b/src/java/org/apache/cassandra/io/util/FileHandle.java @@ -406,14 +406,14 @@ public class FileHandle extends SharedCloseableImpl { if (compressionMetadata != null) { - regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, compressionMetadata) + regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, compressionMetadata, bufferSize) : MmappedRegions.map(channel, compressionMetadata); rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channel, compressionMetadata, regions, crcCheckChanceSupplier)); } else { - regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, length) - : MmappedRegions.map(channel, length); + regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, length, bufferSize) + : MmappedRegions.map(channel, length, bufferSize); rebuffererFactory = new MmapRebufferer(channel, length, regions); } } diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java index 0ab07b8d0f..578217e279 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java +++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java @@ -65,27 +65,22 @@ public class MmappedRegions extends SharedCloseableImpl */ private volatile State copy; - private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length) - { - this(new State(channel), metadata, length); - } - - private MmappedRegions(State state, CompressionMetadata metadata, long length) + private MmappedRegions(State state, long length, int chunkSize) { super(new Tidier(state)); - this.state = state; - - if (metadata != null) - { - assert length == 0 : "expected no length with metadata"; - updateState(metadata); - } - else if (length > 0) + if (length > 0) { - updateState(length); + updateState(length, chunkSize); } + this.copy = new State(state); + } + private MmappedRegions(State state, CompressionMetadata metadata) + { + super(new Tidier(state)); + this.state = state; + updateState(metadata); this.copy = new State(state); } @@ -97,7 +92,7 @@ public class MmappedRegions extends SharedCloseableImpl public static MmappedRegions empty(ChannelProxy channel) { - return new MmappedRegions(channel, null, 0); + return new MmappedRegions(new State(channel), 0, 0); } /** @@ -109,16 +104,16 @@ public class MmappedRegions extends SharedCloseableImpl { if (metadata == null) throw new IllegalArgumentException("metadata cannot be null"); - - return new MmappedRegions(channel, metadata, 0); + State state = new State(channel); + return new MmappedRegions(state, metadata); } - public static MmappedRegions map(ChannelProxy channel, long length) + public static MmappedRegions map(ChannelProxy channel, long length, int chunkSize) { if (length <= 0) throw new IllegalArgumentException("Length must be positive"); - - return new MmappedRegions(channel, null, length); + State state = new State(channel); + return new MmappedRegions(state, length, chunkSize); } /** @@ -140,8 +135,10 @@ public class MmappedRegions extends SharedCloseableImpl * * @return {@code true} if new regions have been created */ - public boolean extend(long length) + public boolean extend(long length, int chunkSize) { + // We cannot enforce length to be a multiple of chunkSize (at the very least the last extend on a file + // will not satisfy this), so we hope the caller knows what they are doing. if (length < 0) throw new IllegalArgumentException("Length must not be negative"); @@ -151,7 +148,7 @@ public class MmappedRegions extends SharedCloseableImpl return false; int initialRegions = state.last; - updateState(length); + updateState(length, chunkSize); copy = new State(state); return state.last > initialRegions; } @@ -162,7 +159,7 @@ public class MmappedRegions extends SharedCloseableImpl * * @return {@code true} if new regions have been created */ - public boolean extend(CompressionMetadata compressionMetadata) + public boolean extend(CompressionMetadata compressionMetadata, int chunkSize) { assert !isCopy() : "Copies cannot be extended"; @@ -171,7 +168,7 @@ public class MmappedRegions extends SharedCloseableImpl int initialRegions = state.last; if (compressionMetadata.compressedFileLength - state.length <= MAX_SEGMENT_SIZE) - updateState(compressionMetadata.compressedFileLength); + updateState(compressionMetadata.compressedFileLength, chunkSize); else updateState(compressionMetadata); @@ -183,13 +180,15 @@ public class MmappedRegions extends SharedCloseableImpl * Updates state by adding the remaining segments. It starts with the current state last segment end position and * subsequently add new segments until all data up to the provided length are mapped. */ - private void updateState(long length) + private void updateState(long length, int chunkSize) { + // make sure the regions span whole chunks + long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize; state.length = length; long pos = state.getPosition(); while (pos < length) { - long size = Math.min(MAX_SEGMENT_SIZE, length - pos); + long size = Math.min(maxSize, length - pos); state.add(pos, size); pos += size; } diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java b/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java index dff9561f4f..e3ebc34609 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java +++ b/src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java @@ -45,12 +45,12 @@ public class MmappedRegionsCache implements AutoCloseable * @param length length of the file * @return a shared copy of the cached mmapped regions */ - public MmappedRegions getOrCreate(ChannelProxy channel, long length) + public MmappedRegions getOrCreate(ChannelProxy channel, long length, int bufferSize) { Preconditions.checkState(!closed); - MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored -> MmappedRegions.map(channel, length)); + MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored -> MmappedRegions.map(channel, length, bufferSize)); Preconditions.checkArgument(regions.isValid(channel)); - regions.extend(length); + regions.extend(length, bufferSize); return regions.sharedCopy(); } @@ -62,12 +62,12 @@ public class MmappedRegionsCache implements AutoCloseable * @param metadata compression metadata of the file * @return a shared copy of the cached mmapped regions */ - public MmappedRegions getOrCreate(ChannelProxy channel, CompressionMetadata metadata) + public MmappedRegions getOrCreate(ChannelProxy channel, CompressionMetadata metadata, int bufferSize) { Preconditions.checkState(!closed); MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored -> MmappedRegions.map(channel, metadata)); Preconditions.checkArgument(regions.isValid(channel)); - regions.extend(metadata); + regions.extend(metadata, bufferSize); return regions.sharedCopy(); } diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java index e6b5dd0c09..af4c6f042d 100644 --- a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java +++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java @@ -114,10 +114,11 @@ public class MmappedRegionsTest public void testTwoSegments() throws Exception { ByteBuffer buffer = allocateBuffer(2048); + int bufSize = 1024; try (ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(1024); + regions.extend(1024, bufSize); for (int i = 0; i < 1024; i++) { MmappedRegions.Region region = regions.floor(i); @@ -126,7 +127,7 @@ public class MmappedRegionsTest assertEquals(1024, region.end()); } - regions.extend(2048); + regions.extend(2048, bufSize); for (int i = 0; i < 2048; i++) { MmappedRegions.Region region = regions.floor(i); @@ -149,14 +150,15 @@ public class MmappedRegionsTest public void testSmallSegmentSize() throws Exception { MmappedRegions.MAX_SEGMENT_SIZE = 1024; + int bufSize = 1024; ByteBuffer buffer = allocateBuffer(4096); try (ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(1024); - regions.extend(2048); - regions.extend(4096); + regions.extend(1024, bufSize); + regions.extend(2048, bufSize); + regions.extend(4096, bufSize); final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE; for (int i = 0; i < buffer.capacity(); i++) @@ -169,17 +171,45 @@ public class MmappedRegionsTest } } + @Test + public void testSizeIsChunkMultiple() throws Exception + { + final int oldMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE; + final int bufSize = 1024; + MmappedRegions.MAX_SEGMENT_SIZE = 2047; + ByteBuffer buffer = allocateBuffer(4096); + try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(1024, bufSize); + regions.extend(2048, bufSize); + regions.extend(4096, bufSize); + for (int i = 0; i < buffer.capacity(); i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + assertEquals(bufSize * (i / bufSize), region.offset()); + assertEquals(bufSize + (bufSize * (i / bufSize)), region.end()); + } + } + finally + { + MmappedRegions.MAX_SEGMENT_SIZE = oldMaxSegmentSize; + } + } + @Test public void testAllocRegions() throws Exception { MmappedRegions.MAX_SEGMENT_SIZE = 1024; ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3); + int bufSize = 1024; try (ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(buffer.capacity()); + regions.extend(buffer.capacity(), bufSize); final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE; for (int i = 0; i < buffer.capacity(); i++) @@ -196,17 +226,18 @@ public class MmappedRegionsTest public void testCopy() throws Exception { ByteBuffer buffer = allocateBuffer(128 * 1024); + int bufSize = 4096; MmappedRegions snapshot; ChannelProxy channelCopy; try (ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer)); - MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4)) + MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, bufSize)) { // create 3 more segments, one per quater capacity - regions.extend(buffer.capacity() / 2); - regions.extend(3 * buffer.capacity() / 4); - regions.extend(buffer.capacity()); + regions.extend(buffer.capacity() / 2, bufSize); + regions.extend(3 * buffer.capacity() / 4, bufSize); + regions.extend(buffer.capacity(), bufSize); // make a snapshot snapshot = regions.sharedCopy(); @@ -238,6 +269,7 @@ public class MmappedRegionsTest public void testCopyCannotExtend() throws Exception { ByteBuffer buffer = allocateBuffer(128 * 1024); + int bufSize = 1024; MmappedRegions snapshot; ChannelProxy channelCopy; @@ -245,7 +277,7 @@ public class MmappedRegionsTest try (ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(buffer.capacity() / 2); + regions.extend(buffer.capacity() / 2, bufSize); // make a snapshot snapshot = regions.sharedCopy(); @@ -256,7 +288,7 @@ public class MmappedRegionsTest try { - snapshot.extend(buffer.capacity()); + snapshot.extend(buffer.capacity(), bufSize); } finally { @@ -269,12 +301,13 @@ public class MmappedRegionsTest public void testExtendOutOfOrder() throws Exception { ByteBuffer buffer = allocateBuffer(4096); + int bufSize = 1024; try (ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(4096); - regions.extend(1024); - regions.extend(2048); + regions.extend(4096, bufSize); + regions.extend(1024, bufSize); + regions.extend(2048, bufSize); for (int i = 0; i < buffer.capacity(); i++) { @@ -290,10 +323,11 @@ public class MmappedRegionsTest public void testNegativeExtend() throws Exception { ByteBuffer buffer = allocateBuffer(1024); + int bufSize = 1024; try (ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(-1); + regions.extend(-1, bufSize); } } @@ -341,8 +375,9 @@ public class MmappedRegionsTest public void testIllegalArgForMap1() throws Exception { ByteBuffer buffer = allocateBuffer(1024); + int bufSize = 1024; try (ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer)); - MmappedRegions regions = MmappedRegions.map(channel, 0)) + MmappedRegions regions = MmappedRegions.map(channel, 0, bufSize)) { assertTrue(regions.isEmpty()); } @@ -352,8 +387,9 @@ public class MmappedRegionsTest public void testIllegalArgForMap2() throws Exception { ByteBuffer buffer = allocateBuffer(1024); + int bufSize = 1024; try (ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer)); - MmappedRegions regions = MmappedRegions.map(channel, -1L)) + MmappedRegions regions = MmappedRegions.map(channel, -1L, bufSize)) { assertTrue(regions.isEmpty()); } @@ -382,6 +418,7 @@ public class MmappedRegionsTest { MmappedRegions.MAX_SEGMENT_SIZE = maxSegmentSize << 10; int size = Arrays.stream(writeSizes).sum() << 10; + int bufSize = 4096; ByteBuffer buffer = allocateBuffer(size); File f = FileUtils.createTempFile("testMapForCompressionMetadata", "1"); @@ -423,10 +460,10 @@ public class MmappedRegionsTest writer.sync(); // verify that calling extend for the same (first iteration) or some previous metadata (further iterations) has no effect - assertFalse(regions.extend(metadata)); + assertFalse(regions.extend(metadata, bufSize)); logger.info("Checking extend on compressed chunk for range={} {}..{} / {}", idx, pos, pos + (writeSizes[idx] << 10), size); - checkExtendOnCompressedChunks(f, writer, regions); + checkExtendOnCompressedChunks(f, writer, regions, bufSize); pos += writeSizes[idx] << 10; idx++; } @@ -434,12 +471,12 @@ public class MmappedRegionsTest } } - private void checkExtendOnCompressedChunks(File f, CompressedSequentialWriter writer, MmappedRegions regions) + private void checkExtendOnCompressedChunks(File f, CompressedSequentialWriter writer, MmappedRegions regions, int bufSize) { int dataOffset; try (CompressionMetadata metadata = writer.open(writer.getLastFlushOffset())) { - regions.extend(metadata); + regions.extend(metadata, bufSize); assertFalse(regions.isEmpty()); dataOffset = 0; while (dataOffset < metadata.dataLength) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org