apoorvmittal10 commented on code in PR #20310: URL: https://github.com/apache/kafka/pull/20310#discussion_r2259953363
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7878,6 +7878,107 @@ public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); } + @Test + public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + + // Futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batches. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id)))); + + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(2L).offsetState().get(3L).state()); + assertEquals(1, sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(7L).batchState()); + assertEquals(1, sharePartition.cachedState().get(7L).batchDeliveryCount()); + + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + + future1.complete(writeShareGroupStateResult); + // The completion of future1 with exception should not impact the cached state since those records have already Review Comment: Thanks, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org