junrao commented on code in PR #19581: URL: https://github.com/apache/kafka/pull/19581#discussion_r2069620518
########## clients/src/main/java/org/apache/kafka/common/record/Records.java: ########## @@ -90,4 +91,18 @@ public interface Records extends TransferableRecords { * @return The record iterator */ Iterable<Record> records(); + + /** + * Return a slice of records from this instance, which is a view into this set starting from the given position + * and with the given size limit. + * + * If the size is beyond the end of the records, the end will be based on the size of the records at the time of the read. + * + * If this records set is already sliced, the position will be taken relative to that slicing. + * + * @param position The start position to begin the read from Review Comment: Could we add that position is expected to be aligned at the batch boundary? Otherwise, the returned Records can't be iterated. ########## clients/src/main/java/org/apache/kafka/common/record/Records.java: ########## @@ -90,4 +91,18 @@ public interface Records extends TransferableRecords { * @return The record iterator */ Iterable<Record> records(); + + /** + * Return a slice of records from this instance, which is a view into this set starting from the given position Review Comment: We are duplicating the same javadoc in 3 places. We could just keep it once in Records. ########## clients/src/main/java/org/apache/kafka/common/record/Records.java: ########## @@ -90,4 +91,18 @@ public interface Records extends TransferableRecords { * @return The record iterator */ Iterable<Record> records(); + + /** + * Return a slice of records from this instance, which is a view into this set starting from the given position + * and with the given size limit. + * + * If the size is beyond the end of the records, the end will be based on the size of the records at the time of the read. + * + * If this records set is already sliced, the position will be taken relative to that slicing. + * + * @param position The start position to begin the read from + * @param size The number of bytes after the start position to include + * @return A sliced wrapper on this message set limited based on the given position and size + */ + Records slice(int position, int size) throws IOException; Review Comment: Could we have both MemoryRecords and FileRecords return Records? ########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java: ########## @@ -1068,6 +1072,149 @@ public void testUnsupportedCompress() { }); } + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSlice(Args args) { + // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, + // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple + // batches in a single MemoryRecords instance. Though with compression, we can have multiple + // appends resulting in a single batch prior RecordBatch.MAGIC_VALUE_V2 as well. + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 3); + recordsPerOffset.put(args.firstOffset + 6L, 8); + recordsPerOffset.put(args.firstOffset + 15L, 4); + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + + // Test slicing from start + MemoryRecords sliced = records.slice(0, records.sizeInBytes()); + assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(records.validBytes(), sliced.validBytes()); + TestUtils.checkEquals(records.batches(), sliced.batches()); + + List<RecordBatch> items = batches(records); + // Test slicing first message. + RecordBatch first = items.get(0); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and size is past the end of the file. + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows. + sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read a single message starting from second message. + RecordBatch second = items.get(1); + sliced = records.slice(first.sizeInBytes(), second.sizeInBytes()); + assertEquals(second.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(Collections.singletonList(second), batches(sliced), "Read a single message starting from the second message"); + + // Read from already sliced view. + List<RecordBatch> remainingItems = IntStream.range(0, items.size()).filter(i -> i != 0 && i != 1).mapToObj(items::get).collect(Collectors.toList()); + int remainingSize = remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum(); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()) + .slice(second.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes() - second.sizeInBytes()); + assertEquals(remainingSize, sliced.sizeInBytes()); + assertEquals(remainingItems, batches(sliced), "Read starting from the third message"); + + // Read from second message and size is past the end of the file on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) Review Comment: The expected usage is that position will be aligned with batch boundary, right? ########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java: ########## @@ -1068,6 +1072,149 @@ public void testUnsupportedCompress() { }); } + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSlice(Args args) { + // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, + // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple + // batches in a single MemoryRecords instance. Though with compression, we can have multiple + // appends resulting in a single batch prior RecordBatch.MAGIC_VALUE_V2 as well. + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 3); + recordsPerOffset.put(args.firstOffset + 6L, 8); + recordsPerOffset.put(args.firstOffset + 15L, 4); + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + + // Test slicing from start + MemoryRecords sliced = records.slice(0, records.sizeInBytes()); + assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(records.validBytes(), sliced.validBytes()); + TestUtils.checkEquals(records.batches(), sliced.batches()); + + List<RecordBatch> items = batches(records); + // Test slicing first message. + RecordBatch first = items.get(0); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and size is past the end of the file. + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows. + sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read a single message starting from second message. + RecordBatch second = items.get(1); + sliced = records.slice(first.sizeInBytes(), second.sizeInBytes()); + assertEquals(second.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(Collections.singletonList(second), batches(sliced), "Read a single message starting from the second message"); + + // Read from already sliced view. + List<RecordBatch> remainingItems = IntStream.range(0, items.size()).filter(i -> i != 0 && i != 1).mapToObj(items::get).collect(Collectors.toList()); + int remainingSize = remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum(); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()) + .slice(second.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes() - second.sizeInBytes()); + assertEquals(remainingSize, sliced.sizeInBytes()); + assertEquals(remainingItems, batches(sliced), "Read starting from the third message"); + + // Read from second message and size is past the end of the file on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) + .slice(first.sizeInBytes() - 1, records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) + .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + } + + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceInvalidPosition(Args args) { + MemoryRecords records = createMemoryRecords(args, Map.of(args.firstOffset, 1)); + assertThrows(IllegalArgumentException.class, () -> records.slice(-1, records.sizeInBytes())); + assertThrows(IllegalArgumentException.class, () -> records.slice(records.sizeInBytes() + 1, records.sizeInBytes())); + } + + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceInvalidSize(Args args) { + MemoryRecords records = createMemoryRecords(args, Map.of(args.firstOffset, 1)); + assertThrows(IllegalArgumentException.class, () -> records.slice(0, -1)); + } + + @Test + public void testSliceEmptyRecords() { + MemoryRecords empty = MemoryRecords.EMPTY; + MemoryRecords sliced = empty.slice(0, 0); + assertEquals(0, sliced.sizeInBytes()); + assertEquals(0, sliced.validBytes()); + assertEquals(0, batches(sliced).size()); + } + + /** + * Test slice when already sliced memory records have start position greater than available bytes + * in the memory records. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceForAlreadySlicedFileRecords(Args args) { Review Comment: The name seems inaccurate since it's testing MemoryRecords. ########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java: ########## @@ -1068,6 +1072,149 @@ public void testUnsupportedCompress() { }); } + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSlice(Args args) { + // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, + // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple Review Comment: Hmm, even before MAGIC_VALUE_V2, a MemoryRecords can have multiple batches. ########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java: ########## @@ -1068,6 +1072,149 @@ public void testUnsupportedCompress() { }); } + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSlice(Args args) { + // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, + // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple + // batches in a single MemoryRecords instance. Though with compression, we can have multiple + // appends resulting in a single batch prior RecordBatch.MAGIC_VALUE_V2 as well. + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 3); + recordsPerOffset.put(args.firstOffset + 6L, 8); + recordsPerOffset.put(args.firstOffset + 15L, 4); + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + + // Test slicing from start + MemoryRecords sliced = records.slice(0, records.sizeInBytes()); + assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(records.validBytes(), sliced.validBytes()); + TestUtils.checkEquals(records.batches(), sliced.batches()); + + List<RecordBatch> items = batches(records); + // Test slicing first message. + RecordBatch first = items.get(0); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and size is past the end of the file. + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows. + sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read a single message starting from second message. + RecordBatch second = items.get(1); + sliced = records.slice(first.sizeInBytes(), second.sizeInBytes()); + assertEquals(second.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(Collections.singletonList(second), batches(sliced), "Read a single message starting from the second message"); + + // Read from already sliced view. + List<RecordBatch> remainingItems = IntStream.range(0, items.size()).filter(i -> i != 0 && i != 1).mapToObj(items::get).collect(Collectors.toList()); + int remainingSize = remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum(); + sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()) + .slice(second.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes() - second.sizeInBytes()); + assertEquals(remainingSize, sliced.sizeInBytes()); + assertEquals(remainingItems, batches(sliced), "Read starting from the third message"); + + // Read from second message and size is past the end of the file on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) + .slice(first.sizeInBytes() - 1, records.sizeInBytes()); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + + // Read from second message and position + size overflows on the already sliced view. + sliced = records.slice(1, records.sizeInBytes() - 1) + .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE); + assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); + assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); + } + + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceInvalidPosition(Args args) { + MemoryRecords records = createMemoryRecords(args, Map.of(args.firstOffset, 1)); + assertThrows(IllegalArgumentException.class, () -> records.slice(-1, records.sizeInBytes())); + assertThrows(IllegalArgumentException.class, () -> records.slice(records.sizeInBytes() + 1, records.sizeInBytes())); + } + + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceInvalidSize(Args args) { + MemoryRecords records = createMemoryRecords(args, Map.of(args.firstOffset, 1)); + assertThrows(IllegalArgumentException.class, () -> records.slice(0, -1)); + } + + @Test + public void testSliceEmptyRecords() { + MemoryRecords empty = MemoryRecords.EMPTY; + MemoryRecords sliced = empty.slice(0, 0); + assertEquals(0, sliced.sizeInBytes()); + assertEquals(0, sliced.validBytes()); + assertEquals(0, batches(sliced).size()); + } + + /** + * Test slice when already sliced memory records have start position greater than available bytes + * in the memory records. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testSliceForAlreadySlicedFileRecords(Args args) { + LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); + recordsPerOffset.put(args.firstOffset, 5); + recordsPerOffset.put(args.firstOffset + 5L, 10); + recordsPerOffset.put(args.firstOffset + 15L, 12); + recordsPerOffset.put(args.firstOffset + 27L, 4); + + MemoryRecords records = createMemoryRecords(args, recordsPerOffset); + List<RecordBatch> items = batches(records.slice(0, records.sizeInBytes())); + + // Slice from third message until the end. + int position = IntStream.range(0, 2).map(i -> items.get(i).sizeInBytes()).sum(); + MemoryRecords sliced = records.slice(position, records.sizeInBytes() - position); + assertEquals(records.sizeInBytes() - position, sliced.sizeInBytes()); + assertEquals(items.subList(2, items.size()), batches(sliced), "Read starting from the third message"); + + // Further slice the already sliced memory records, from fourth message until the end. Now the Review Comment: Not sure I understand the comment. The code slices from the 3rd message, instead of the 4th. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org