apoorvmittal10 commented on code in PR #19010: URL: https://github.com/apache/kafka/pull/19010#discussion_r1967816092
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -6190,6 +6198,289 @@ public void testFindLastOffsetAcknowledgedWhenGapAtBeginning() { assertEquals(-1, lastOffsetAcknowledged); } + /** + * Test the case where the fetch batch has first record offset greater than the record batch start offset. + * Such batches can exist for compacted topics. + */ + @Test + public void testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + // Set the base offset at 5. + try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, + TimestampType.CREATE_TIME, 5, 2)) { + // Append records from offset 10. + memoryRecords(2, 10).records().forEach(builder::append); + // Append records from offset 15. + memoryRecords(2, 15).records().forEach(builder::append); + } + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Complete batch from 5-16 will be acquired, hence 12 records. + fetchAcquiredRecords(sharePartition, records, 12); + // Partially acknowledge the batch from 5-16. + sharePartition.acknowledge(MEMBER_ID, Arrays.asList( + new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)), + new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT */)), + new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)), + new ShareAcknowledgementBatch(15, 16, List.of((byte) 2 /* RELEASE */)))); + + assertEquals(15, sharePartition.nextFetchOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(5L)); + assertNotNull(sharePartition.cachedState().get(5L).offsetState()); + + // Check cached state. + Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(15L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(16L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState()); + } + + /** + * Test the case where the available cached batches never appear again in fetch response within the + * previous fetch offset range. Also remove records from the previous fetch batches. + * <p> + * Such case can arise with compacted topics where complete batches are removed or records within + * batches are removed. + */ + @Test + public void testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + // Create 3 batches of records for a single acquire. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 0).close(); + memoryRecordsBuilder(buffer, 15, 5).close(); + memoryRecordsBuilder(buffer, 15, 20).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Acquire batch (0-34) which shall create single cache entry. + fetchAcquiredRecords(sharePartition, records, 35); + // Acquire another 3 individual batches of records. + fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15); + // Release all batches in the cache. + sharePartition.releaseAcquiredRecords(MEMBER_ID); + // Validate cache has 4 entries. + assertEquals(4, sharePartition.cachedState().size()); + + // Compact all batches and remove some of the batches from the fetch response. + buffer = ByteBuffer.allocate(4096); + try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, + TimestampType.CREATE_TIME, 0, 2)) { + // Append only 2 records for 0 offset batch starting from offset 1. + memoryRecords(2, 1).records().forEach(builder::append); + } + // Do not include batch from offset 5. And compact batch starting at offset 20. + try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, + TimestampType.CREATE_TIME, 20, 2)) { + // Append 2 records for 20 offset batch starting from offset 20. + memoryRecords(2, 20).records().forEach(builder::append); + // And append 2 records matching the end offset of the batch. + memoryRecords(2, 33).records().forEach(builder::append); + } + // Send the full batch at offset 40. + memoryRecordsBuilder(buffer, 5, 40).close(); + // Do not include batch from offset 45. And compact the batch at offset 50. + try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, + TimestampType.CREATE_TIME, 50, 2)) { + // Append 5 records for 50 offset batch starting from offset 51. + memoryRecords(5, 51).records().forEach(builder::append); + // Append 2 records for in middle of the batch. + memoryRecords(2, 58).records().forEach(builder::append); + // And append 1 record prior to the end offset. + memoryRecords(1, 63).records().forEach(builder::append); + } + buffer.flip(); + records = MemoryRecords.readableRecords(buffer); + // Acquire the new compacted batches. The acquire method determines the acquisition range using + // the first and last offsets of the fetched batches and acquires all available cached batches + // within that range. That means the batch from offset 45-49 which is not included in the + // fetch response will also be acquired. Similarly, for the batch from offset 5-19 which is + // anyway in the bigger cached batch of 0-34, will also be acquired. This avoids iterating + // through individual fetched batch boundaries; the client is responsible for reporting any + // data gaps via acknowledgements. This test also covers the edge case where the last fetched + // batch is compacted, and its last offset is before the previously cached version's last offset. + // In this situation, the last batch's offset state tracking is initialized. This is handled + // correctly because the client will send individual offset acknowledgements, which require offset + // state tracking anyway. While this last scenario is unlikely in practice (as a batch's reported + // last offset should remain correct even after compaction), the test verifies its proper handling. + fetchAcquiredRecords(sharePartition, records, 59); + assertEquals(64, sharePartition.nextFetchOffset()); + } + + /** + * This test verifies that cached batches which are no longer returned in fetch responses (starting + * from the fetchOffset) are correctly archived. Archiving these batches is crucial for the SPSO + * and the next fetch offset to advance. Without archiving, these offsets would be stuck, as the + * cached batches would remain available. + * <p> + * This scenario can occur with compacted topics when entire batches, previously held in the cache, + * are removed from the log at the offset where reading occurs. + */ + @Test + public void testAcquireWhenBatchesRemovedForFetchOffset() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15); + // Release the batches in the cache. + sharePartition.releaseAcquiredRecords(MEMBER_ID); + // Validate cache has 3 entries. + assertEquals(3, sharePartition.cachedState().size()); + + // Compact second batch and remove first batch from the fetch response. + ByteBuffer buffer = ByteBuffer.allocate(4096); + try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, + TimestampType.CREATE_TIME, 5, 2)) { + // Append only 4 records for 5th offset batch starting from offset 6. + memoryRecords(4, 6).records().forEach(builder::append); + } + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + // Only second batch should be acquired and first batch offsets should be archived. Send + // fetchOffset as 0. + fetchAcquiredRecords(sharePartition, records, 0, 0, 5); + assertEquals(10, sharePartition.nextFetchOffset()); + // Though the next fetch offset is moved but start offset should remain the same as we acquire Review Comment: Corrected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org