kevin-wu24 commented on code in PR #19668:
URL: https://github.com/apache/kafka/pull/19668#discussion_r2114387246


##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -62,30 +70,35 @@ public RequestManager(
     }
 
     /**
-     * Returns true if there are any connections with pending requests.
+     * Returns true if there are any in-flight requests for a request type.
      *
-     * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+     * This is useful for satisfying the invariant that there is only one 
pending Fetch
+     * and FetchSnapshot request.
      * If there are more than one pending fetch request, it is possible for 
the follower to write
      * the same offset twice.
      *
      * @param currentTimeMs the current time
-     * @return true if the request manager is tracking at least one request
+     * @param wantRequestKey the request type to check for in-flight requests
+     * @return true if the request manager is tracking at least one request of 
the given type
      */
-    public boolean hasAnyInflightRequest(long currentTimeMs) {
+    public boolean hasAnyInflightRequest(long currentTimeMs, ApiKeys 
wantRequestKey) {
         boolean result = false;
 
-        Iterator<ConnectionState> iterator = connections.values().iterator();
+        final var iterator = inflightRequests.entrySet().iterator();
         while (iterator.hasNext()) {
-            ConnectionState connection = iterator.next();
-            if (connection.hasRequestTimedOut(currentTimeMs)) {
+            final var entry = iterator.next();
+            final var requestKey = entry.getKey().requestType().apiKey();
+            final var requestState = entry.getValue();
+            if (requestState.hasRequestTimedOut(currentTimeMs)) {
                 // Mark the node as ready after request timeout
                 iterator.remove();
-            } else if (connection.isBackoffComplete(currentTimeMs)) {
+            } else if (requestState.isBackoffComplete(currentTimeMs)) {
                 // Mark the node as ready after completed backoff
                 iterator.remove();
-            } else if (connection.hasInflightRequest(currentTimeMs)) {
+            } else if (requestKey == wantRequestKey &&

Review Comment:
   Now that I think about this more, I think the RequestManager APIs themselves 
should be taking in `RequestType`, not `ApiKeys` parameter, since `RequestType` 
is what the manager actually deals with. This prevents the callers of the API 
from accidentally passing `ApiKeys.FETCH_SNAPSHOT` directly, since this request 
type can technically never be present in the map (it gets encoded with the 
`FETCH` key when sent).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to