apoorvmittal10 commented on code in PR #19430: URL: https://github.com/apache/kafka/pull/19430#discussion_r2052861184
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -197,6 +197,16 @@ public byte id() { } } + /** + * The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease, + * or do nothing. + */ + enum DeliveryCountOps { + INCREASE, + DECREASE, + NOOPS Review Comment: will it be better? ```suggestion NO_OP ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3003,24 +3014,32 @@ private boolean hasOngoingStateTransition() { * if the state update is unsuccessful. * * @param newState The new state of the records. - * @param incrementDeliveryCount Whether to increment the delivery count. + * @param ops The behavior on the delivery count. * * @return {@code InFlightState} if update succeeds, null otherwise. Returning state * helps update chaining. */ - private InFlightState tryUpdateState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) { + private InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { + boolean decreaseFlag = false; try { + if (ops == DeliveryCountOps.DECREASE && newState != RecordState.ARCHIVED) { + deliveryCount--; Review Comment: This doesn't work as per the method docs i.e. `The delivery count is not incremented if the state update is unsuccessful.` Shouldn't the change in deliveryCount happen after `validateTransition` check? Also can you please update the javadoc from `The delivery count is not incremented if the state update is unsuccessful.` => `The delivery count is not changed if the state update is unsuccessful.` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3003,24 +3014,32 @@ private boolean hasOngoingStateTransition() { * if the state update is unsuccessful. * * @param newState The new state of the records. - * @param incrementDeliveryCount Whether to increment the delivery count. + * @param ops The behavior on the delivery count. * * @return {@code InFlightState} if update succeeds, null otherwise. Returning state * helps update chaining. */ - private InFlightState tryUpdateState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) { + private InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { + boolean decreaseFlag = false; try { + if (ops == DeliveryCountOps.DECREASE && newState != RecordState.ARCHIVED) { + deliveryCount--; + decreaseFlag = true; + } if (newState == RecordState.AVAILABLE && deliveryCount >= maxDeliveryCount) { newState = RecordState.ARCHIVED; } state = state.validateTransition(newState); - if (incrementDeliveryCount && newState != RecordState.ARCHIVED) { + if (ops == DeliveryCountOps.INCREASE && newState != RecordState.ARCHIVED) { deliveryCount++; } memberId = newMemberId; return this; } catch (IllegalStateException e) { log.error("Failed to update state of the records", e); + if (decreaseFlag) { + deliveryCount++; + } Review Comment: We shouldn't need this change if we decrement the deliveryCount post stateTransition validation, correct? ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -4887,14 +4889,50 @@ public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovementToMiddleOfBa Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID)); assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); } + @Test + public void testReleaseAcquiredRecordsDecreaseDeliveryCount() { Review Comment: How this test is different from earlier tests where delivery count was being decremented on release? ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -3652,7 +3652,8 @@ public void testReleaseSingleRecordBatch() { assertEquals(0, sharePartition.nextFetchOffset()); assertEquals(1, sharePartition.cachedState().size()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState()); - assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount()); Review Comment: Can we have check `assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount());` prio calling `releaseAcquiredRecords` which validates that deliveryCount was first incremented and then latter decremented? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -197,6 +197,16 @@ public byte id() { } } + /** + * The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease, + * or do nothing. + */ + enum DeliveryCountOps { Review Comment: ```suggestion private enum DeliveryCountOps { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org