apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1939357943


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1825,8 +1922,8 @@ private boolean canMoveStartOffset() {
 
         NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(startOffset);
         if (entry == null) {
-            log.error("The start offset: {} is not found in the cached state 
for share partition: {}-{}."
-                + " Cannot move the start offset.", startOffset, groupId, 
topicIdPartition);
+            log.debug("The start offset: {} is not found in the cached state 
for share partition: {}-{} " +
+                "as there is an acquirable gap at the beginning. Cannot move 
the start offset.", startOffset, groupId, topicIdPartition);
             return false;

Review Comment:
   Please write comments on top regarding why the entry can be null in some 
cases now.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1847,12 +1948,20 @@ private boolean isRecordStateAcknowledged(RecordState 
recordState) {
         return recordState == RecordState.ACKNOWLEDGED || recordState == 
RecordState.ARCHIVED;
     }
 
-    private long findLastOffsetAcknowledged() {
-        lock.readLock().lock();
+    // Visible for testing
+    long findLastOffsetAcknowledged() {
         long lastOffsetAcknowledged = -1;
+        lock.readLock().lock();
         try {
             for (NavigableMap.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
                 InFlightBatch inFlightBatch = entry.getValue();
+                // If initialReadGapOffset.gapStartOffset is less than or 
equal to the last offset of the batch
+                // then we cannot identify the current inFlightBatch as 
acknowledged. All the offsets between
+                // initialReadGapOffset.gapStartOffset and 
initialReadGapOffset.endOffset should always be present
+                // in the cachedState
+                if (isInitialReadGapOffsetWindowActive() && 
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+                    return lastOffsetAcknowledged;
+                }

Review Comment:
   nit: line break here please.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1847,12 +1948,20 @@ private boolean isRecordStateAcknowledged(RecordState 
recordState) {
         return recordState == RecordState.ACKNOWLEDGED || recordState == 
RecordState.ARCHIVED;
     }
 
-    private long findLastOffsetAcknowledged() {
-        lock.readLock().lock();
+    // Visible for testing
+    long findLastOffsetAcknowledged() {
         long lastOffsetAcknowledged = -1;
+        lock.readLock().lock();
         try {
             for (NavigableMap.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
                 InFlightBatch inFlightBatch = entry.getValue();
+                // If initialReadGapOffset.gapStartOffset is less than or 
equal to the last offset of the batch
+                // then we cannot identify the current inFlightBatch as 
acknowledged. All the offsets between
+                // initialReadGapOffset.gapStartOffset and 
initialReadGapOffset.endOffset should always be present
+                // in the cachedState

Review Comment:
   I am not sure if this line of comment is helping here. May be remove it.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -444,6 +450,11 @@ public CompletableFuture<Void> maybeInitialize() {
                 stateEpoch = partitionData.stateEpoch();
 
                 List<PersisterStateBatch> stateBatches = 
partitionData.stateBatches();
+                boolean isGapPresentInStateBatches = false;

Review Comment:
   long gapStartOffset = -1;



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -452,6 +463,10 @@ public CompletableFuture<Void> maybeInitialize() {
                         throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
                         return;
                     }
+                    if (stateBatch.firstOffset() > previousBatchLastOffset + 
1) {
+                        isGapPresentInStateBatches = true;

Review Comment:
   gapStartOffset = previousBatchLastOffset + 1;



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -648,6 +682,27 @@ public ShareAcquiredRecords acquire(
                 }
 
                 InFlightBatch inFlightBatch = entry.getValue();
+

Review Comment:
   nit: remove line break.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
+                    // There is a gap from 21 to 30. Let the 
initialReadGapOffset.gapStartOffset be 21. In this case,
+                    // lastOffsetAcknowledged will be 20, but we cannot simply 
move the start offset to the first offset
+                    // of next cachedState batch (next cachedState batch is 31 
to 40). There is an acquirable gap in between (21 to 30)
+                    // and The startOffset should be at 21. Hence, we set 
startOffset to the minimum of initialReadGapOffset.gapStartOffset
+                    // and higher key of lastOffsetAcknowledged
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);

Review Comment:
   Do we have a test case covering this code?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -896,6 +896,170 @@ public void testMaybeInitializeWithReadException() {
         assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
     }
 
+    @Test
+    public void testMaybeInitializeStateBatchesWithGapAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ARCHIVED.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(15L));
+        assertNotNull(sharePartition.cachedState().get(21L));
+
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+        assertEquals(30, sharePartition.cachedState().get(21L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(21L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(21L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(10, initialReadGapOffset.gapStartOffset());
+        assertEquals(30, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithMultipleGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(15L));
+        assertNotNull(sharePartition.cachedState().get(30L));
+
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+        assertEquals(40, sharePartition.cachedState().get(30L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(30L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(30L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(30L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(10, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(15, sharePartition.startOffset());

Review Comment:
   Hmm, this seems problematic. So 15-20 are acknowledged and start offset is 
15, hence ideally this should move. But because of gapstartoffset tracking I 
think we couldn't, correct? So should we move the gap start offset from first 
gap itself in mayBeInitilaize method? 



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -462,6 +477,10 @@ public CompletableFuture<Void> maybeInitialize() {
                     // in the cached state are not missed
                     findNextFetchOffset.set(true);
                     endOffset = 
cachedState.lastEntry().getValue().lastOffset();
+                    // initialReadGapOffset is not required, if there are no 
gaps in the read state response
+                    if (isGapPresentInStateBatches) {
+                        initialReadGapOffset = new 
InitialReadGapOffset(endOffset, startOffset);

Review Comment:
   ```suggestion
                          initialReadGapOffset = new 
InitialReadGapOffset(endOffset, gapStartOffset);
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -462,6 +477,10 @@ public CompletableFuture<Void> maybeInitialize() {
                     // in the cached state are not missed
                     findNextFetchOffset.set(true);
                     endOffset = 
cachedState.lastEntry().getValue().lastOffset();
+                    // initialReadGapOffset is not required, if there are no 
gaps in the read state response
+                    if (isGapPresentInStateBatches) {

Review Comment:
   if (gapStartOffset != -1) {



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -452,6 +463,10 @@ public CompletableFuture<Void> maybeInitialize() {
                         throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
                         return;
                     }
+                    if (stateBatch.firstOffset() > previousBatchLastOffset + 
1) {

Review Comment:
   ```suggestion
                       if (gapStartOffset == -1 && stateBatch.firstOffset() > 
previousBatchLastOffset + 1) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to