adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2031757437
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition sharePartition, Map<Long, L return errorMessage.toString(); } + @Test + public void testFilterRecordBatchesFromAcquiredRecords() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + List<AcquiredRecords> acquiredRecords1 = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1) + ); + List<RecordBatch> recordBatches1 = List.of( + memoryRecordsBuilder(3, 2).build().batches().iterator().next(), + memoryRecordsBuilder(3, 12).build().batches().iterator().next() + ); + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1)), + sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, recordBatches1)); + + List<AcquiredRecords> acquiredRecords2 = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 3), + new AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 3), + new AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 3) + ); + List<RecordBatch> recordBatches2 = List.of( + memoryRecordsBuilder(21, 5).build().batches().iterator().next(), + memoryRecordsBuilder(5, 31).build().batches().iterator().next() + ); + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 3), + new AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 3) + + ), sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, recordBatches2) + ); + + // Record batches is empty. + assertEquals(acquiredRecords2, sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, List.of())); + + List<AcquiredRecords> acquiredRecords3 = List.of( + new AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 1) + ); + List<RecordBatch> recordBatches3 = List.of( + memoryRecordsBuilder(1, 8).build().batches().iterator().next(), + memoryRecordsBuilder(1, 18).build().batches().iterator().next() + ); + + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 1) + + ), sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, recordBatches3) + ); + } + + @Test + public void testAcquireWithReadCommittedIsolationLevel() { + SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build()); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 5, 15).close(); + memoryRecordsBuilder(buffer, 15, 20).close(); + memoryRecordsBuilder(buffer, 8, 50).close(); + memoryRecordsBuilder(buffer, 10, 58).close(); + memoryRecordsBuilder(buffer, 5, 70).close(); + + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + FetchPartitionData fetchPartitionData = fetchPartitionData(records, newAbortedTransactions()); + + // We are mocking the result of function fetchAbortedTransactionRecordBatches. The records present at these offsets need to be archived. + // We won't be utilizing the aborted transactions passed in fetchPartitionData. + when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(), fetchPartitionData.abortedTransactions.get())).thenReturn( + List.of( + memoryRecordsBuilder(5, 10).build().batches().iterator().next(), + memoryRecordsBuilder(10, 58).build().batches().iterator().next(), + memoryRecordsBuilder(5, 70).build().batches().iterator().next() + ) + ); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords( + sharePartition.acquire( + MEMBER_ID, + 10 /* Batch size */, + 100, + DEFAULT_FETCH_OFFSET, + fetchPartitionData, + FetchIsolation.TXN_COMMITTED), + 45 /* Gap of 15 records will be added to second batch, gap of 2 records will also be added to fourth batch */); + + assertEquals(List.of( + new AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short) 1) + ), acquiredRecordsList); + assertEquals(75, sharePartition.nextFetchOffset()); + + // Checking cached state. + assertEquals(4, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().containsKey(10L)); + assertTrue(sharePartition.cachedState().containsKey(20L)); + assertTrue(sharePartition.cachedState().containsKey(50L)); + assertTrue(sharePartition.cachedState().containsKey(70L)); + assertNotNull(sharePartition.cachedState().get(10L).offsetState()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState()); + + assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset()); + assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(70L).batchState()); + + assertEquals(MEMBER_ID, sharePartition.cachedState().get(20L).batchMemberId()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(70L).batchMemberId()); + + assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask()); + + Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(19L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + + assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask()); + + expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(50L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(51L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(52L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(53L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(54L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(55L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(56L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(57L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(58L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(59L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(60L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(61L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(62L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(63L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(64L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(65L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(66L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(67L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(68L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(69L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(50L).offsetState()); + + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask()); + } + + @Test + public void testContainsAbortMarker() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + // Record batch is not a control batch. + RecordBatch recordBatch = mock(RecordBatch.class); + when(recordBatch.isControlBatch()).thenReturn(false); + assertFalse(sharePartition.containsAbortMarker(recordBatch)); + + // Record batch is a control batch but doesn't contain any records.. + recordBatch = mock(RecordBatch.class); + Iterator batchIterator = mock(Iterator.class); + when(batchIterator.hasNext()).thenReturn(false); + when(recordBatch.iterator()).thenReturn(batchIterator); + when(recordBatch.isControlBatch()).thenReturn(true); + assertFalse(sharePartition.containsAbortMarker(recordBatch)); + + // Record batch is a control batch which contains a record of type ControlRecordType.ABORT. + recordBatch = mock(RecordBatch.class); + batchIterator = mock(Iterator.class); + when(batchIterator.hasNext()).thenReturn(true); + DefaultRecord record = mock(DefaultRecord.class); + ByteBuffer buffer = ByteBuffer.allocate(4096); + // Buffer has to be created in a way that ControlRecordType.parse(buffer) returns ControlRecordType.ABORT. + buffer.putShort((short) 5); + buffer.putShort(ControlRecordType.ABORT.type()); + buffer.putInt(23432); // some field added in version 5 + buffer.flip(); + when(record.key()).thenReturn(buffer); + when(batchIterator.next()).thenReturn(record); + when(recordBatch.iterator()).thenReturn(batchIterator); + when(recordBatch.isControlBatch()).thenReturn(true); + assertTrue(sharePartition.containsAbortMarker(recordBatch)); + + // Record batch is a control batch which contains a record of type ControlRecordType.COMMIT. + recordBatch = mock(RecordBatch.class); + batchIterator = mock(Iterator.class); + when(batchIterator.hasNext()).thenReturn(true); + record = mock(DefaultRecord.class); + buffer = ByteBuffer.allocate(4096); + // Buffer has to be created in a way that ControlRecordType.parse(buffer) returns ControlRecordType.COMMIT. + buffer.putShort((short) 5); + buffer.putShort(ControlRecordType.COMMIT.type()); + buffer.putInt(23432); // some field added in version 5 + buffer.flip(); + when(record.key()).thenReturn(buffer); + when(batchIterator.next()).thenReturn(record); + when(recordBatch.iterator()).thenReturn(batchIterator); + when(recordBatch.isControlBatch()).thenReturn(true); + assertFalse(sharePartition.containsAbortMarker(recordBatch)); + } + + @Test + public void testFetchAbortedTransactionRecordBatchesForOnlyAbortedTransactions() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + // Case 1 - Creating 10 transactional records in a single batch followed by a ABORT marker record for producerId 1. + ByteBuffer buffer = ByteBuffer.allocate(1024); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 1, 0); + buffer.flip(); + Records records = MemoryRecords.readableRecords(buffer); + + List<FetchResponseData.AbortedTransaction> abortedTransactions = List.of( + new FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1) + ); + // records from 0 to 9 should be archived because they are a part of aborted transactions. + List<RecordBatch> actual = sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), abortedTransactions); + assertEquals(1, actual.size()); + assertEquals(0, actual.get(0).baseOffset()); + assertEquals(9, actual.get(0).lastOffset()); + assertEquals(1, actual.get(0).producerId()); + + // Case 2: 3 individual batches each followed by a ABORT marker record for producerId 1. + buffer = ByteBuffer.allocate(1024); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 0); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 2); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 4); + buffer.flip(); + records = MemoryRecords.readableRecords(buffer); + abortedTransactions = List.of( + new FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1), + new FetchResponseData.AbortedTransaction().setFirstOffset(2).setProducerId(1), + new FetchResponseData.AbortedTransaction().setFirstOffset(4).setProducerId(1) + ); + + actual = sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), abortedTransactions); + assertEquals(3, actual.size()); + assertEquals(0, actual.get(0).baseOffset()); + assertEquals(0, actual.get(0).lastOffset()); + assertEquals(1, actual.get(0).producerId()); + assertEquals(2, actual.get(1).baseOffset()); + assertEquals(2, actual.get(1).lastOffset()); + assertEquals(1, actual.get(1).producerId()); + assertEquals(4, actual.get(2).baseOffset()); + assertEquals(4, actual.get(2).lastOffset()); + assertEquals(1, actual.get(2).producerId()); + + // Case 3: The producer id of records is different, so they should not be archived, + buffer = ByteBuffer.allocate(1024); + // We are creating 10 transactional records followed by a ABORT marker record for producerId 2. + newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 2, 0); + buffer.flip(); + records = MemoryRecords.readableRecords(buffer); + abortedTransactions = List.of( + new FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1) + ); + + actual = sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), abortedTransactions); + assertEquals(0, actual.size()); + } + + @Test + public void testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransactions() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 0); + newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 2, 3); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 2, 6); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 9); + newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 1, 12); + newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 15); + buffer.flip(); + Records records = MemoryRecords.readableRecords(buffer); + + // Case 1 - Aborted transactions does not contain the record batch from 4-5 with producer id 2. Review Comment: Corrected the comment to `Case 1 - Aborted transactions does not contain the record batch from offsets 6-7 with producer id 2.` I added this item `new FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(1)` to `abortedTransactions` to show that even if offsets 6-7 is mentioned under `abortedTransactions`, they won't get filtered out for archiving since they have a different `producerId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org