squah-confluent commented on code in PR #22204:
URL: https://github.com/apache/kafka/pull/22204#discussion_r3186783689


##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java:
##########
@@ -111,14 +111,24 @@ public long generateProducerId() {
     }
 
     private void maybeRequestNextBlock() {
+        if (nextProducerIdBlock.get() != null) {
+            return;
+        }
+        // KAFKA-20114 - Acquire requestInFlight before reading 
backoffDeadlineMs. The response handler

Review Comment:
   formatting nit:
   ```suggestion
           }
   
           // KAFKA-20114 - Acquire requestInFlight before reading 
backoffDeadlineMs. The response handler
   ```



##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java:
##########
@@ -111,14 +111,24 @@ public long generateProducerId() {
     }
 
     private void maybeRequestNextBlock() {
+        if (nextProducerIdBlock.get() != null) {
+            return;
+        }
+        // KAFKA-20114 - Acquire requestInFlight before reading 
backoffDeadlineMs. The response handler
+        // updates backoffDeadlineMs before clearing requestInFlight, so a 
successful CAS
+        // after that clear observes the updated backoff and avoids a 
premature retry.
+        if (!requestInFlight.compareAndSet(false, true)) {
+            return;
+        }
+
         var retryTimestamp = backoffDeadlineMs.get();
-        if (retryTimestamp == NO_RETRY || time.milliseconds() >= 
retryTimestamp) {
-            // Send a request only if we reached the retry deadline, or if no 
deadline was set.
-            if (nextProducerIdBlock.get() == null &&
-                    requestInFlight.compareAndSet(false, true)) {
-                sendRequest();
-            }
+        var now = time.milliseconds();
+
+        if (retryTimestamp != NO_RETRY && now < retryTimestamp) {

Review Comment:
   Could we preserve the comment?
   ```suggestion
           // Don't send a request if there is a retry deadline and the 
deadline has not passed yet.
           if (retryTimestamp != NO_RETRY && now < retryTimestamp) {
   ```



##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java:
##########
@@ -111,14 +111,24 @@ public long generateProducerId() {
     }
 
     private void maybeRequestNextBlock() {
+        if (nextProducerIdBlock.get() != null) {
+            return;
+        }
+        // KAFKA-20114 - Acquire requestInFlight before reading 
backoffDeadlineMs. The response handler
+        // updates backoffDeadlineMs before clearing requestInFlight, so a 
successful CAS
+        // after that clear observes the updated backoff and avoids a 
premature retry.
+        if (!requestInFlight.compareAndSet(false, true)) {
+            return;
+        }
+
         var retryTimestamp = backoffDeadlineMs.get();
-        if (retryTimestamp == NO_RETRY || time.milliseconds() >= 
retryTimestamp) {
-            // Send a request only if we reached the retry deadline, or if no 
deadline was set.
-            if (nextProducerIdBlock.get() == null &&
-                    requestInFlight.compareAndSet(false, true)) {
-                sendRequest();
-            }
+        var now = time.milliseconds();
+
+        if (retryTimestamp != NO_RETRY && now < retryTimestamp) {
+            requestInFlight.set(false);
+            return;
         }
+        sendRequest();

Review Comment:
   formatting nit:
   ```suggestion
   
           sendRequest();
   ```



##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java:
##########
@@ -146,6 +156,9 @@ public void onTimeout() {
     private void handleUnsuccessfulResponse() {
         // There is no need to compare and set because only one thread
         // handles the AllocateProducerIds response.
+
+        // KAFKA-20114 - Update the backoff before clearing requestInFlight. 
maybeRequestNextBlock
+        // relies on this ordering when it acquires requestInFlight before 
reading the deadline.
         backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
         requestInFlight.set(false);

Review Comment:
   We clear `requestInFlight` at the end of a request in three places. Could we 
factor out a small method that takes the new backoff deadline and updates the 
atomics in the correct order?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to