apoorvmittal10 commented on code in PR #19430:
URL: https://github.com/apache/kafka/pull/19430#discussion_r2063589486


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2999,23 +3010,32 @@ private boolean hasOngoingStateTransition() {
 
         /**
          * Try to update the state of the records. The state of the records 
can only be updated if the
-         * new state is allowed to be transitioned from old state. The 
delivery count is not incremented
+         * new state is allowed to be transitioned from old state. The 
delivery count is not changed
          * if the state update is unsuccessful.
          *
          * @param newState The new state of the records.
-         * @param incrementDeliveryCount Whether to increment the delivery 
count.
+         * @param ops      The behavior on the delivery count.
          *
          * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
          *         helps update chaining.
          */
-        private InFlightState tryUpdateState(RecordState newState, boolean 
incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+        private InFlightState tryUpdateState(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
             try {
-                if (newState == RecordState.AVAILABLE && deliveryCount >= 
maxDeliveryCount) {
+                if (newState == RecordState.AVAILABLE && ops != 
DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
                     newState = RecordState.ARCHIVED;
                 }
                 state = state.validateTransition(newState);
-                if (incrementDeliveryCount && newState != 
RecordState.ARCHIVED) {
-                    deliveryCount++;
+                if (newState != RecordState.ARCHIVED) {
+                    switch (ops) {
+                        case INCREASE -> deliveryCount++;
+                        case DECREASE -> deliveryCount--;
+                        // do nothing
+                        case NO_OP -> { }
+                        default -> {
+                            log.error("Invalid ops for delivery count.");
+                            return null;
+                        }
+                    }

Review Comment:
   nit: Is below cleaner? You won't require unneccessary `default` statement 
and method will be cleaner as well.
   
   ```
   deliveryCount = updatedDeliveryCount(ops);
   ```
   
   ```
   private int updatedDeliveryCount(DeliveryCountOps ops) {
               return switch (ops) {
                   case INCREASE -> deliveryCount + 1;
                   case DECREASE -> deliveryCount - 1;
                   // do nothing
                   case NO_OP -> deliveryCount;
               };
           }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to