apoorvmittal10 commented on code in PR #19430:
URL: https://github.com/apache/kafka/pull/19430#discussion_r2052861184


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -197,6 +197,16 @@ public byte id() {
         }
     }
 
+    /**
+     * The DeliveryCountOps is used to specify the behavior on the delivery 
count: increase, decrease,
+     * or do nothing.
+     */
+    enum DeliveryCountOps {
+        INCREASE,
+        DECREASE,
+        NOOPS

Review Comment:
   will it be better?
   ```suggestion
           NO_OP
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3003,24 +3014,32 @@ private boolean hasOngoingStateTransition() {
          * if the state update is unsuccessful.
          *
          * @param newState The new state of the records.
-         * @param incrementDeliveryCount Whether to increment the delivery 
count.
+         * @param ops      The behavior on the delivery count.
          *
          * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
          *         helps update chaining.
          */
-        private InFlightState tryUpdateState(RecordState newState, boolean 
incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+        private InFlightState tryUpdateState(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            boolean decreaseFlag = false;
             try {
+                if (ops == DeliveryCountOps.DECREASE && newState != 
RecordState.ARCHIVED) {
+                    deliveryCount--;

Review Comment:
   This doesn't work as per the method docs i.e. `The delivery count is not 
incremented if the state update is unsuccessful.`
   
   Shouldn't the change in deliveryCount happen after `validateTransition` 
check? Also can you please update the javadoc from `The delivery count is not 
incremented if the state update is unsuccessful.` => `The delivery count is not 
changed if the state update is unsuccessful.`



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3003,24 +3014,32 @@ private boolean hasOngoingStateTransition() {
          * if the state update is unsuccessful.
          *
          * @param newState The new state of the records.
-         * @param incrementDeliveryCount Whether to increment the delivery 
count.
+         * @param ops      The behavior on the delivery count.
          *
          * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
          *         helps update chaining.
          */
-        private InFlightState tryUpdateState(RecordState newState, boolean 
incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+        private InFlightState tryUpdateState(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            boolean decreaseFlag = false;
             try {
+                if (ops == DeliveryCountOps.DECREASE && newState != 
RecordState.ARCHIVED) {
+                    deliveryCount--;
+                    decreaseFlag = true;
+                }
                 if (newState == RecordState.AVAILABLE && deliveryCount >= 
maxDeliveryCount) {
                     newState = RecordState.ARCHIVED;
                 }
                 state = state.validateTransition(newState);
-                if (incrementDeliveryCount && newState != 
RecordState.ARCHIVED) {
+                if (ops == DeliveryCountOps.INCREASE && newState != 
RecordState.ARCHIVED) {
                     deliveryCount++;
                 }
                 memberId = newMemberId;
                 return this;
             } catch (IllegalStateException e) {
                 log.error("Failed to update state of the records", e);
+                if (decreaseFlag) {
+                    deliveryCount++;
+                }

Review Comment:
   We shouldn't need this change if we decrement the deliveryCount post 
stateTransition validation, correct?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -4887,14 +4889,50 @@ public void 
testReleaseAcquiredRecordsBatchesPostStartOffsetMovementToMiddleOfBa
 
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
 
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
     }
 
+    @Test
+    public void testReleaseAcquiredRecordsDecreaseDeliveryCount() {

Review Comment:
   How this test is different from earlier tests where delivery count was being 
decremented on release?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -3652,7 +3652,8 @@ public void testReleaseSingleRecordBatch() {
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).batchState());
-        assertEquals(1, 
sharePartition.cachedState().get(0L).batchDeliveryCount());

Review Comment:
   Can we have check `assertEquals(1, 
sharePartition.cachedState().get(0L).batchDeliveryCount());` prio calling 
`releaseAcquiredRecords` which validates that deliveryCount was first 
incremented and then latter decremented?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -197,6 +197,16 @@ public byte id() {
         }
     }
 
+    /**
+     * The DeliveryCountOps is used to specify the behavior on the delivery 
count: increase, decrease,
+     * or do nothing.
+     */
+    enum DeliveryCountOps {

Review Comment:
   ```suggestion
       private enum DeliveryCountOps {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to