apoorvmittal10 commented on code in PR #19598: URL: https://github.com/apache/kafka/pull/19598#discussion_r2075278984
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -96,6 +97,10 @@ public class DelayedShareFetch extends DelayedOperation { * Metric for the rate of expired delayed fetch requests. */ private final Meter expiredRequestMeter; + /** + * fetchId serves as a token while acquiring/releasing share partition's fetch lock from a DelayedShareFetch instance. Review Comment: ```suggestion * fetchId serves as a token while acquiring/releasing share partition's fetch lock. ``` ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -376,20 +384,22 @@ LinkedHashMap<TopicIdPartition, Long> acquirablePartitions( sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) -> { // Add the share partition to the list of partitions to be fetched only if we can // acquire the fetch lock on it. - if (sharePartition.maybeAcquireFetchLock()) { + if (sharePartition.maybeAcquireFetchLock(fetchId)) { try { + log.trace("Fetch lock for share partition {}-{} has been acquired by {}", shareFetch.groupId(), topicIdPartition, fetchId); // If the share partition is already at capacity, we should not attempt to fetch. if (sharePartition.canAcquireRecords()) { topicPartitionData.put(topicIdPartition, sharePartition.nextFetchOffset()); } else { - sharePartition.releaseFetchLock(); - log.trace("Record lock partition limit exceeded for SharePartition {}, " + - "cannot acquire more records", sharePartition); + sharePartition.releaseFetchLock(fetchId); + log.trace("Record lock partition limit exceeded for SharePartition {}-{}, " + + "cannot acquire more records. Releasing the fetch lock by {}", shareFetch.groupId(), topicIdPartition, fetchId); } } catch (Exception e) { log.error("Error checking condition for SharePartition: {}", sharePartition, e); Review Comment: While you are in the file, can you fix logs where we are logging sharePartition, the log line will be of no help. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1338,13 +1340,14 @@ boolean canAcquireRecords() { * share partition is not fetched concurrently by multiple clients. The fetch lock is released once * the records are fetched and acquired. * + * @param fetchId - the DelayedShareFetch instance uuid that is trying to acquire the fetch lock. Review Comment: Why do we need to talk about `DelayedShareFetch` it's not relevant in SharePartition. It's just uuid which represents the caller's id. ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7134,6 +7137,23 @@ public void testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransa assertEquals(1, actual.get(3).producerId()); } + @Test + public void testFetchLockReleasedByDifferentId() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + Uuid fetchId1 = mock(Uuid.class); + Uuid fetchId2 = mock(Uuid.class); + + // Initially, fetch lock is not acquired. + assertNull(sharePartition.fetchLock()); + // fetchId1 acquires the fetch lock. + assertTrue(sharePartition.maybeAcquireFetchLock(fetchId1)); + // If we release fetch lock by fetchId2, it will work. + sharePartition.releaseFetchLock(fetchId2); + assertNull(sharePartition.fetchLock()); // Fetch lock has been released. Review Comment: Please write comments her, why this currently releases the lock. And once we make the locks handling strict then this test case need to be updated. ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1647,14 +1649,15 @@ public void testRecordFetchLockRatioMetric() { .thenReturn(80L) // for time when lock is released .thenReturn(160L); // to update lock idle duration while acquiring lock again. - assertTrue(sharePartition.maybeAcquireFetchLock()); - sharePartition.releaseFetchLock(); + Uuid fetchId = mock(Uuid.class); Review Comment: Is mocking really helpful with Uuid in any way? ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7134,6 +7137,23 @@ public void testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransa assertEquals(1, actual.get(3).producerId()); } + @Test + public void testFetchLockReleasedByDifferentId() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + Uuid fetchId1 = mock(Uuid.class); + Uuid fetchId2 = mock(Uuid.class); Review Comment: nit: Can't be just `Uuid.randomUuid()`, why to mock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org