apoorvmittal10 commented on code in PR #19598:
URL: https://github.com/apache/kafka/pull/19598#discussion_r2075278984


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -96,6 +97,10 @@ public class DelayedShareFetch extends DelayedOperation {
      * Metric for the rate of expired delayed fetch requests.
      */
     private final Meter expiredRequestMeter;
+    /**
+     * fetchId serves as a token while acquiring/releasing share partition's 
fetch lock from a DelayedShareFetch instance.

Review Comment:
   ```suggestion
        * fetchId serves as a token while acquiring/releasing share partition's 
fetch lock.
   ```



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -376,20 +384,22 @@ LinkedHashMap<TopicIdPartition, Long> 
acquirablePartitions(
         sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) 
-> {
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
-            if (sharePartition.maybeAcquireFetchLock()) {
+            if (sharePartition.maybeAcquireFetchLock(fetchId)) {
                 try {
+                    log.trace("Fetch lock for share partition {}-{} has been 
acquired by {}", shareFetch.groupId(), topicIdPartition, fetchId);
                     // If the share partition is already at capacity, we 
should not attempt to fetch.
                     if (sharePartition.canAcquireRecords()) {
                         topicPartitionData.put(topicIdPartition, 
sharePartition.nextFetchOffset());
                     } else {
-                        sharePartition.releaseFetchLock();
-                        log.trace("Record lock partition limit exceeded for 
SharePartition {}, " +
-                            "cannot acquire more records", sharePartition);
+                        sharePartition.releaseFetchLock(fetchId);
+                        log.trace("Record lock partition limit exceeded for 
SharePartition {}-{}, " +
+                            "cannot acquire more records. Releasing the fetch 
lock by {}", shareFetch.groupId(), topicIdPartition, fetchId);
                     }
                 } catch (Exception e) {
                     log.error("Error checking condition for SharePartition: 
{}", sharePartition, e);

Review Comment:
   While you are in the file, can you fix logs where we are logging 
sharePartition, the log line will be of no help.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1338,13 +1340,14 @@ boolean canAcquireRecords() {
      * share partition is not fetched concurrently by multiple clients. The 
fetch lock is released once
      * the records are fetched and acquired.
      *
+     * @param fetchId - the DelayedShareFetch instance uuid that is trying to 
acquire the fetch lock.

Review Comment:
   Why do we need to talk about `DelayedShareFetch` it's not relevant in 
SharePartition. It's just uuid which represents the caller's id.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7134,6 +7137,23 @@ public void 
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransa
         assertEquals(1, actual.get(3).producerId());
     }
 
+    @Test
+    public void testFetchLockReleasedByDifferentId() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+        Uuid fetchId1 = mock(Uuid.class);
+        Uuid fetchId2 = mock(Uuid.class);
+
+        // Initially, fetch lock is not acquired.
+        assertNull(sharePartition.fetchLock());
+        // fetchId1 acquires the fetch lock.
+        assertTrue(sharePartition.maybeAcquireFetchLock(fetchId1));
+        // If we release fetch lock by fetchId2, it will work.
+        sharePartition.releaseFetchLock(fetchId2);
+        assertNull(sharePartition.fetchLock()); // Fetch lock has been 
released.

Review Comment:
   Please write comments her, why this currently releases the lock. And once we 
make the locks handling strict then this test case need to be updated.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1647,14 +1649,15 @@ public void testRecordFetchLockRatioMetric() {
             .thenReturn(80L) // for time when lock is released
             .thenReturn(160L); // to update lock idle duration while acquiring 
lock again.
 
-        assertTrue(sharePartition.maybeAcquireFetchLock());
-        sharePartition.releaseFetchLock();
+        Uuid fetchId = mock(Uuid.class);

Review Comment:
   Is mocking really helpful with Uuid in any way?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7134,6 +7137,23 @@ public void 
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransa
         assertEquals(1, actual.get(3).producerId());
     }
 
+    @Test
+    public void testFetchLockReleasedByDifferentId() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+        Uuid fetchId1 = mock(Uuid.class);
+        Uuid fetchId2 = mock(Uuid.class);

Review Comment:
   nit: Can't be just `Uuid.randomUuid()`, why to mock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to