chirag-wadhwa5 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1940953629


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1951,164 +2112,1102 @@ public void 
testAcquireReleasedRecordMultipleBatches() {
         // Fourth fetch request with 5 records starting from offset 28.
         MemoryRecords records4 = memoryRecords(5, 28);
 
-        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 40, 3, records1,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(20, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records4,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(33, sharePartition.nextFetchOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(28L).batchState());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertNull(sharePartition.cachedState().get(28L).offsetState());
+
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            Collections.singletonList(new ShareAcknowledgementBatch(12, 30, 
Collections.singletonList((byte) 2))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchState());
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(15L).batchMemberId());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(23L).batchState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(23L).batchMemberId());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(28L).batchState());
+        assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        expectedOffsetStateMap.clear();
+        expectedOffsetStateMap.put(28L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(29L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(30L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(31L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(32L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(28L).offsetState());
+
+        // Send next batch from offset 12, only 3 records should be acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            3);
+
+        assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Though record2 batch exists to acquire but send batch record3, it 
should be acquired but
+        // next fetch offset should not move.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Acquire partial records from batch 2.
+        MemoryRecords subsetRecords = memoryRecords(2, 17);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Acquire partial records from record 4 to further test if the next 
fetch offset move
+        // accordingly once complete record 2 is also acquired.
+        subsetRecords = memoryRecords(1, 28);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            1);
+
+        assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Try to acquire complete record 2 though it's already partially 
acquired, the next fetch
+        // offset should move.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            3);
+
+        // Offset 15,16 and 19 should be acquired.
+        List<AcquiredRecords> expectedAcquiredRecords = 
expectedAcquiredRecords(15, 16, 2);
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(29, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testAcquireGapAtBeginningAndRecordsFetchedFromGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        // All records fetched are part of the gap. The gap is from 11 to 20, 
fetched offsets are 11 to 15.
+        MemoryRecords records = memoryRecords(5, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 15, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(16, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(16, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        // Fetched offsets overlap the inFlight batches. The gap is from 11 to 
20, but fetched records are from 11 to 25.
+        MemoryRecords records = memoryRecords(15, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(41, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        // Fetched offsets overlap the inFlight batches. The gap is from 11 to 
20, but fetched records are from 11 to 25.
+        MemoryRecords records = memoryRecords(15, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            15);
+
+        // The gap from 11 to 20 will be acquired. Since the next batch is 
AVAILABLE, and we records fetched from replica manager
+        // overlap with the next batch, some records from the next batch will 
also be acquired
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(26, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(26, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOffset() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        // Fetched records are part of inFlightBatch 11-20 with state 
AVAILABLE. Fetched offsets also overlap the
+        // inFlight batches. The gap is from 11 to 20, but fetched records are 
from 11 to 25.
+        MemoryRecords records = memoryRecords(15, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            15);
+
+        // 2 different batches will be acquired this time (11-20 and 21-25). 
The first batch will have delivery count 3
+        // as previous deliveryCount was 2. The second batch will have 
delivery count 1 as it is acquired for the first time.
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(26, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(26, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsOverlapMultipleInFlightBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+                        new PersisterStateBatch(61L, 70L, 
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+                        new PersisterStateBatch(81L, 90L, 
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(90, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        MemoryRecords records = memoryRecords(75, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            55);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 31-40 (gap offsets)
+        // 3. 41-50 (AVAILABLE batch in cachedState)
+        // 4. 51-60 (gap offsets)
+        // 5. 71-80 (gap offsets)
+        // 6. 81-85 (AVAILABLE batch in cachedState). These will be acquired 
as separate batches because we are breaking a batch in the cachedState
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(90, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(86, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(86, initialReadGapOffset.gapStartOffset());
+        assertEquals(90, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ACKNOWLEDGED.id, (short) 1), // There is a gap from 31 to 40
+                        new PersisterStateBatch(61L, 70L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 51 to 60
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(70, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        MemoryRecords records = memoryRecords(20, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            20);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 21-30 (AVAILABLE batch in cachedState)
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 3));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(70, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(31, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(31, initialReadGapOffset.gapStartOffset());
+        assertEquals(70, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsIncludeGapOffsetsAtEnd() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+                        new PersisterStateBatch(61L, 70L, 
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+                        new PersisterStateBatch(81L, 90L, 
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(90, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        MemoryRecords records = memoryRecords(65, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            45);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 31-40 (gap offsets)
+        // 3. 41-50 (AVAILABLE batch in cachedState)
+        // 4. 51-60 (gap offsets)
+        // 5. 71-75 (gap offsets). The gap is from 71 to 80, but the fetched 
records end at 75. These gap offsets will be acquired as a single batch
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 75, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(90, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(76, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(76, initialReadGapOffset.gapStartOffset());
+        assertEquals(90, initialReadGapOffset.endOffset());
+    }
+
+
+    @Test
+    public void 
testAcquireWhenRecordsFetchedFromGapAndMaxFetchRecordsIsExceeded() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        // The start offset will be moved to 21, since the offsets 11 to 20 
are acknowledged, and will be removed
+        // from cached state in the maybeUpdateCachedStateAndOffsets method
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        // Creating 3 batches of records with a total of 8 records
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 3, 21).close();
+        memoryRecordsBuilder(buffer, 3, 24).close();
+        memoryRecordsBuilder(buffer, 2, 27).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                6, // maxFetchRecords is less than the number of records 
fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            6);
+
+        // Since max fetch records (6) is less than the number of records 
fetched (8), only 6 records will be acquired
+        assertArrayEquals(expectedAcquiredRecord(21, 26, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(27, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(27, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testAcquireMaxFetchRecordsExceededAfterAcquiringGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11-20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        // Creating 3 batches of records with a total of 8 records
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 10, 11).close();
+        memoryRecordsBuilder(buffer, 10, 21).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                8, // maxFetchRecords is less than the number of records 
fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 21-30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(11, sharePartition.nextFetchOffset());
+
+        // Creating 3 batches of records with a total of 8 records
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 10, 11).close();
+        memoryRecordsBuilder(buffer, 20, 21).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                8, // maxFetchRecords is less than the number of records 
fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireWhenRecordsFetchedFromGapAndPartitionContainsNaturalGaps() {

Review Comment:
   The gaps actually correspond with the gap in the cached state (21 to 29). 
The memory records created have the following batches ->
   (10, 20)
   (30, 50)
   
   The significance of this test lies in the fact that the natural gap is in 
between the batches fetched from the partition, but also coincide with the gap 
in cached state. In this case, the broker acquire the gap because the broker 
does not parse all the batches in the fetched record, so it's not aware of the 
presence of any natural gaps. The broker only knows the range of offsets 
fetched (from 10 to 50), and will assume all these offsets contain information. 
It is then the client's responsibility to inform the broker about the natural 
gap in the batch of 21-29. I will add some comments in the test as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to