chirag-wadhwa5 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2556752233


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1762,10 +1779,57 @@ private List<AcquiredRecords> createBatches(
                 .setLastOffset(lastAcquiredOffset)
                 .setDeliveryCount((short) 1));
 
-            result.forEach(acquiredRecords -> {
-                // Schedule acquisition lock timeout for the batch.
-                AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), 
acquiredRecords.lastOffset());
-                // Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+            if (isRecordLimitMode) {
+                // In record_limit mode, there will always be only one single 
batch in the result.
+                AcquiredRecords acquiredRecords = result.get(0);
+                // When the count of acquired records exceeds the max fetch 
limit, only initialize and schedule acquisition lock for
+                // acquired records up to the max fetch boundary and remaining 
offsets should still in available state.
+                // i.e. acquired records are 10-19 (10 records) and max fetch 
records is 5, then only 10-14 should be acquired
+                // and offset 15-19 should still in available state.
+                if (acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1 > maxFetchRecords) {
+                    InFlightBatch inFlightBatch = new InFlightBatch(
+                        timer,
+                        time,
+                        memberId,
+                        acquiredRecords.firstOffset(),
+                        acquiredRecords.lastOffset(),
+                        RecordState.ACQUIRED,
+                        1,
+                        null,
+                        timeoutHandler,
+                        sharePartitionMetrics);
+                    int delayMs = 
recordLockDurationMsOrDefault(groupConfigManager, groupId, 
defaultRecordLockDurationMs);
+                    long lastOffset = acquiredRecords.firstOffset() + 
maxFetchRecords - 1;
+                    acquiredRecords.setLastOffset(lastOffset);
+                    inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, 
delayMs);
+                    updateFindNextFetchOffset(true);
+
+                    cachedState.put(acquiredRecords.firstOffset(), 
inFlightBatch);
+                    sharePartitionMetrics.recordInFlightBatchMessageCount(
+                        acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1);

Review Comment:
   I agree with @JimmyWang6. As per the current implementation, a single batch 
could have different records in different states, acquired by different share 
consumers. When a batch is indeed broken because of a subset of records being 
acknowledged with a different type, the metric is not updated even then. In 
fact, a new value is recorded with the metric only when a new in-flight batch 
is being created. In this case, the new batch's boundary is between 
acquiredRecords.lastOffset() and acquiredRecords.firstOffset(); therefore, 
according to me, the current implementation is more sensible here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to