jsancio commented on code in PR #19668:
URL: https://github.com/apache/kafka/pull/19668#discussion_r2116147585


##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -62,30 +70,35 @@ public RequestManager(
     }
 
     /**
-     * Returns true if there are any connections with pending requests.
+     * Returns true if there are any in-flight requests for a request type.
      *
-     * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+     * This is useful for satisfying the invariant that there is only one 
pending Fetch
+     * and FetchSnapshot request.
      * If there are more than one pending fetch request, it is possible for 
the follower to write
      * the same offset twice.
      *
      * @param currentTimeMs the current time
-     * @return true if the request manager is tracking at least one request
+     * @param wantRequestKey the request type to check for in-flight requests
+     * @return true if the request manager is tracking at least one request of 
the given type
      */
-    public boolean hasAnyInflightRequest(long currentTimeMs) {
+    public boolean hasAnyInflightRequest(long currentTimeMs, ApiKeys 
wantRequestKey) {
         boolean result = false;
 
-        Iterator<ConnectionState> iterator = connections.values().iterator();
+        final var iterator = inflightRequests.entrySet().iterator();
         while (iterator.hasNext()) {
-            ConnectionState connection = iterator.next();
-            if (connection.hasRequestTimedOut(currentTimeMs)) {
+            final var entry = iterator.next();
+            final var requestKey = entry.getKey().requestType().apiKey();
+            final var requestState = entry.getValue();
+            if (requestState.hasRequestTimedOut(currentTimeMs)) {
                 // Mark the node as ready after request timeout
                 iterator.remove();
-            } else if (connection.isBackoffComplete(currentTimeMs)) {
+            } else if (requestState.isBackoffComplete(currentTimeMs)) {
                 // Mark the node as ready after completed backoff
                 iterator.remove();
-            } else if (connection.hasInflightRequest(currentTimeMs)) {
+            } else if (requestKey == wantRequestKey &&

Review Comment:
   > Now that I think about this more, I think another way to enforce this is 
by having the public RequestManager APIs themselves take in RequestType, not 
ApiKeys parameter, since RequestType is what the manager actually deals with.
   
   Yes but what the caller knows about is `ApiKeys` and `ApiMessages`. 
`RequestType` is an implementation detail of `RequestManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to