chia7712 commented on code in PR #19581: URL: https://github.com/apache/kafka/pull/19581#discussion_r2075897444
########## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ########## @@ -209,16 +209,13 @@ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) { * of the fetched records. Otherwise, the original records are returned. Review Comment: please update the docs. ########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java: ########## @@ -1068,6 +1073,148 @@ public void testUnsupportedCompress() { }); } + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSlice(Args args) throws IOException { + // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, + // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple + // batches in a single MemoryRecords instance. Though with compression, we can have multiple + // appends resulting in a single batch prior RecordBatch.MAGIC_VALUE_V2 as well. + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 3); + recordsPerOffset.put(args.firstOffset + 6L, 8); + recordsPerOffset.put(args.firstOffset + 15L, 4); + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + + // Test slicing from start + Records sliced = records.slice(0, records.sizeInBytes()); + assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(records.validBytes(), ((MemoryRecords) sliced).validBytes()); + TestUtils.checkEquals(records.batches(), ((MemoryRecords) sliced).batches()); + + List<RecordBatch> items = batches(records); + // Test slicing first message. + RecordBatch first = items.get(0); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + + // Read from second message and size is past the end of the file. + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows. + sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + + // Read a single message starting from second message. + RecordBatch second = items.get(1); + sliced = records.slice(first.sizeInBytes(), second.sizeInBytes()); + assertEquals(second.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(Collections.singletonList(second), batches(sliced), "Read a single message starting from the second message"); + + // Read from already sliced view. + List<RecordBatch> remainingItems = IntStream.range(0, items.size()).filter(i -> i != 0 && i != 1).mapToObj(items::get).collect(Collectors.toList()); + int remainingSize = remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum(); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()) + .slice(second.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes() - second.sizeInBytes()); + assertEquals(remainingSize, sliced.sizeInBytes()); + assertEquals(remainingItems, batches(sliced), "Read starting from the third message"); + + // Read from second message and size is past the end of the file on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) + .slice(first.sizeInBytes() - 1, records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) + .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + } + + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceInvalidPosition(Args args) { + MemoryRecords records = createMemoryRecords(args, Map.of(args.firstOffset, 1)); + assertThrows(IllegalArgumentException.class, () -> records.slice(-1, records.sizeInBytes())); + assertThrows(IllegalArgumentException.class, () -> records.slice(records.sizeInBytes() + 1, records.sizeInBytes())); + } + + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceInvalidSize(Args args) { + MemoryRecords records = createMemoryRecords(args, Map.of(args.firstOffset, 1)); + assertThrows(IllegalArgumentException.class, () -> records.slice(0, -1)); + } + + @Test + public void testSliceEmptyRecords() { + MemoryRecords empty = MemoryRecords.EMPTY; + Records sliced = empty.slice(0, 0); + assertEquals(0, sliced.sizeInBytes()); + assertEquals(0, batches(sliced).size()); + } + + /** + * Test slice when already sliced memory records have start position greater than available bytes + * in the memory records. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceForAlreadySlicedMemoryRecords(Args args) throws IOException { + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 5); + recordsPerOffset.put(args.firstOffset + 5L, 10); + recordsPerOffset.put(args.firstOffset + 15L, 12); + recordsPerOffset.put(args.firstOffset + 27L, 4); + + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + List<RecordBatch> items = batches(records.slice(0, records.sizeInBytes())); + + // Slice from third message until the end. + int position = IntStream.range(0, 2).map(i -> items.get(i).sizeInBytes()).sum(); + Records sliced = records.slice(position, records.sizeInBytes() - position); + assertEquals(records.sizeInBytes() - position, sliced.sizeInBytes()); + assertEquals(items.subList(2, items.size()), batches(sliced), "Read starting from the third message"); + + // Further slice the already sliced memory records, from fourth message until the end. Now the + // bytes available in the sliced records are less than the start position. However, the + // position to slice is relative hence reset position to second message in the sliced memory + // records i.e. reset with the size of the third message from the original memory records. + position = items.get(2).sizeInBytes(); + Records finalSliced = sliced.slice(position, sliced.sizeInBytes() - position); + assertEquals(sliced.sizeInBytes() - position, finalSliced.sizeInBytes()); + assertEquals(items.subList(3, items.size()), batches(finalSliced), "Read starting from the fourth message"); + } + + private MemoryRecords createMemoryRecords(Args args, Map<Long, Integer> recordsPerOffset) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + recordsPerOffset.forEach((offset, numOfRecords) -> { + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, + TimestampType.CREATE_TIME, offset); + for (int i = 0; i < numOfRecords; i++) { + builder.appendWithOffset(offset + i, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + } + builder.close(); + }); + buffer.flip(); Review Comment: To be safer, especially if builder might create a new buffer for more spaces, it's better to use `builder#buffer` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org