apoorvmittal10 commented on code in PR #20310:
URL: https://github.com/apache/kafka/pull/20310#discussion_r2259953363


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7878,6 +7878,107 @@ public void 
testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup
         
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
     }
 
+    @Test
+    public void testRecordArchivedWithWriteStateRPCFailure() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withPersister(persister)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+        // Futures which will be completed later, so the batch state has 
ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        // Acknowledge batches.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
+
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(2L).offsetState().get(3L).state());
+        assertEquals(1, 
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+        future1.complete(writeShareGroupStateResult);
+        // The completion of future1 with exception should not impact the 
cached state since those records have already

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to