apoorvmittal10 commented on code in PR #19581: URL: https://github.com/apache/kafka/pull/19581#discussion_r2075600196
########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java: ########## @@ -1068,6 +1072,149 @@ public void testUnsupportedCompress() { }); } + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSlice(Args args) { + // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, + // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple + // batches in a single MemoryRecords instance. Though with compression, we can have multiple + // appends resulting in a single batch prior RecordBatch.MAGIC_VALUE_V2 as well. + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 3); + recordsPerOffset.put(args.firstOffset + 6L, 8); + recordsPerOffset.put(args.firstOffset + 15L, 4); + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + + // Test slicing from start + MemoryRecords sliced = records.slice(0, records.sizeInBytes()); + assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(records.validBytes(), sliced.validBytes()); + TestUtils.checkEquals(records.batches(), sliced.batches()); + + List<RecordBatch> items = batches(records); + // Test slicing first message. + RecordBatch first = items.get(0); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and size is past the end of the file. + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows. + sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read a single message starting from second message. + RecordBatch second = items.get(1); + sliced = records.slice(first.sizeInBytes(), second.sizeInBytes()); + assertEquals(second.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(Collections.singletonList(second), batches(sliced), "Read a single message starting from the second message"); + + // Read from already sliced view. + List<RecordBatch> remainingItems = IntStream.range(0, items.size()).filter(i -> i != 0 && i != 1).mapToObj(items::get).collect(Collectors.toList()); + int remainingSize = remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum(); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()) + .slice(second.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes() - second.sizeInBytes()); + assertEquals(remainingSize, sliced.sizeInBytes()); + assertEquals(remainingItems, batches(sliced), "Read starting from the third message"); + + // Read from second message and size is past the end of the file on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) Review Comment: Yes, that's true. And test case also verifies that though the first slice was random but subequent slice on same with correct batch boundary resulted in correct data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org