jsancio commented on code in PR #19668:
URL: https://github.com/apache/kafka/pull/19668#discussion_r2098412030


##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -42,7 +42,12 @@
  * {@code retryBackoffMs}.
  */
 public class RequestManager {
-    private final Map<String, ConnectionState> connections = new HashMap<>();
+    private record NodeAndRequestKey(String nodeId, ApiKeys requestKey) {
+        private static NodeAndRequestKey of(Node node, ApiKeys requestKey) {
+            return new NodeAndRequestKey(node.idString(), requestKey);
+        }
+    }
+    private final Map<NodeAndRequestKey, ConnectionState> connections = new 
HashMap<>();

Review Comment:
   These are not connections and connection states anymore. Maybe 
`inflightRequests` and `RequestState` are more accurate.



##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -62,21 +67,28 @@ public RequestManager(
     }
 
     /**
-     * Returns true if there are any connections with pending requests.
+     * Returns true if there are any connections with pending requests for a 
request type.
      *
-     * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+     * This is useful for satisfying the invariant that there is only one 
pending Fetch
+     * and FetchSnapshot request.
      * If there are more than one pending fetch request, it is possible for 
the follower to write
      * the same offset twice.
      *
      * @param currentTimeMs the current time
+     * @param requestKey the api key for the request
      * @return true if the request manager is tracking at least one request
      */
-    public boolean hasAnyInflightRequest(long currentTimeMs) {
+    public boolean hasAnyInflightRequest(long currentTimeMs, ApiKeys 
requestKey) {

Review Comment:
   How about having this signature `hasInflightRequest(long, Set<ApiKeys>)` so 
you can this method you can check the request state with 
`connection.hasInfightRequest(currentTimeMs, requestKeys)`.



##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -62,21 +67,28 @@ public RequestManager(
     }
 
     /**
-     * Returns true if there are any connections with pending requests.
+     * Returns true if there are any connections with pending requests for a 
request type.
      *
-     * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+     * This is useful for satisfying the invariant that there is only one 
pending Fetch
+     * and FetchSnapshot request.
      * If there are more than one pending fetch request, it is possible for 
the follower to write
      * the same offset twice.
      *
      * @param currentTimeMs the current time
+     * @param requestKey the api key for the request
      * @return true if the request manager is tracking at least one request
      */
-    public boolean hasAnyInflightRequest(long currentTimeMs) {
+    public boolean hasAnyInflightRequest(long currentTimeMs, ApiKeys 
requestKey) {
         boolean result = false;
 
-        Iterator<ConnectionState> iterator = connections.values().iterator();
+        final var iterator = connections.entrySet().iterator();
         while (iterator.hasNext()) {
-            ConnectionState connection = iterator.next();
+            final var entry = iterator.next();
+            final var nodeAndRequestKey = entry.getKey();
+            final var connection = entry.getValue();
+            if (!nodeAndRequestKey.requestKey.equals(requestKey)) {
+                continue;
+            }

Review Comment:
   Did you consider not skipping these entries so that we can remove them 
proactively?



##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -173,58 +194,58 @@ public long backoffBeforeAvailableBootstrapServer(long 
currentTimeMs) {
         return minBackoffMs;
     }
 
-    public boolean hasRequestTimedOut(Node node, long timeMs) {
-        ConnectionState state = connections.get(node.idString());
+    public boolean hasRequestTimedOut(Node node, long timeMs, ApiKeys 
requestKey) {
+        ConnectionState state = connections.get(NodeAndRequestKey.of(node, 
requestKey));
         if (state == null) {
             return false;
         }
 
         return state.hasRequestTimedOut(timeMs);
     }
 
-    public boolean isReady(Node node, long timeMs) {
-        ConnectionState state = connections.get(node.idString());
+    public boolean isReady(Node node, long timeMs, ApiKeys requestKey) {
+        ConnectionState state = connections.get(NodeAndRequestKey.of(node, 
requestKey));
         if (state == null) {
             return true;
         }
 
         boolean ready = state.isReady(timeMs);
         if (ready) {
-            reset(node);
+            reset(node, requestKey);
         }
 
         return ready;
     }
 
-    public boolean isBackingOff(Node node, long timeMs) {
-        ConnectionState state = connections.get(node.idString());
+    public boolean isBackingOff(Node node, long timeMs, ApiKeys requestKey) {
+        ConnectionState state = connections.get(NodeAndRequestKey.of(node, 
requestKey));
         if (state == null) {
             return false;
         }
 
         return state.isBackingOff(timeMs);
     }
 
-    public long remainingRequestTimeMs(Node node, long timeMs) {
-        ConnectionState state = connections.get(node.idString());
+    public long remainingRequestTimeMs(Node node, long timeMs, ApiKeys 
requestKey) {
+        ConnectionState state = connections.get(NodeAndRequestKey.of(node, 
requestKey));
         if (state == null) {
             return 0;
         }
 
         return state.remainingRequestTimeMs(timeMs);
     }
 
-    public long remainingBackoffMs(Node node, long timeMs) {
-        ConnectionState state = connections.get(node.idString());
+    public long remainingBackoffMs(Node node, long timeMs, ApiKeys requestKey) 
{
+        ConnectionState state = connections.get(NodeAndRequestKey.of(node, 
requestKey));
         if (state == null) {
             return 0;
         }
 
         return state.remainingBackoffMs(timeMs);
     }
 
-    public boolean isResponseExpected(Node node, long correlationId) {
-        ConnectionState state = connections.get(node.idString());
+    public boolean isResponseExpected(Node node, long correlationId, ApiKeys 
requestKey) {

Review Comment:
   Let's add Java docs for all of this public methods: `isResponseExpected`, 
`remainingBackoffMs`, `remainingRequestTimeMs`, `isBackingOff`, `isReady` and 
`hasRequestTimedOut`.



##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -258,18 +287,19 @@ public void onResponseResult(Node node, long 
correlationId, boolean success, lon
      * @param node the destination of the request
      * @param correlationId the correlation id of the request
      * @param timeMs the current time
+     * @param requestKey the api key for the request
      */
-    public void onRequestSent(Node node, long correlationId, long timeMs) {
+    public void onRequestSent(Node node, long correlationId, long timeMs, 
ApiKeys requestKey) {
         ConnectionState state = connections.computeIfAbsent(
-            node.idString(),
+            NodeAndRequestKey.of(node, requestKey),
             key -> new ConnectionState(node, retryBackoffMs, requestTimeoutMs)
         );
 
         state.onRequestSent(correlationId, timeMs);
     }
 
-    public void reset(Node node) {
-        connections.remove(node.idString());
+    public void reset(Node node, ApiKeys requestKey) {

Review Comment:
   Let's add Java doc for this method.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2762,25 +2767,27 @@ private void handleInboundMessage(RaftMessage message, 
long currentTimeMs) {
      * @param currentTimeMs the current time
      * @param destination the node receiving the request
      * @param requestSupplier the function that creates the request
+     * @param api the type of request to maybe send
      * @return the first element in the pair indicates if the request was 
sent; the second element
      *         in the pair indicates the time to wait before retrying.
      */
     private RequestSendResult maybeSendRequest(
         long currentTimeMs,
         Node destination,
-        Supplier<ApiMessage> requestSupplier
+        Supplier<ApiMessage> requestSupplier,
+        ApiKeys api

Review Comment:
   How about apiKey for the parameter name.
   ```java
           ApiKeys apiKey
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2762,25 +2767,27 @@ private void handleInboundMessage(RaftMessage message, 
long currentTimeMs) {
      * @param currentTimeMs the current time
      * @param destination the node receiving the request
      * @param requestSupplier the function that creates the request
+     * @param api the type of request to maybe send
      * @return the first element in the pair indicates if the request was 
sent; the second element
      *         in the pair indicates the time to wait before retrying.
      */
     private RequestSendResult maybeSendRequest(
         long currentTimeMs,
         Node destination,
-        Supplier<ApiMessage> requestSupplier
+        Supplier<ApiMessage> requestSupplier,
+        ApiKeys api
     )  {
         var requestSent = false;
 
-        if (requestManager.isBackingOff(destination, currentTimeMs)) {
-            long remainingBackoffMs = 
requestManager.remainingBackoffMs(destination, currentTimeMs);
+        if (requestManager.isBackingOff(destination, currentTimeMs, api)) {
+            long remainingBackoffMs = 
requestManager.remainingBackoffMs(destination, currentTimeMs, api);
             logger.debug("Connection for {} is backing off for {} ms", 
destination, remainingBackoffMs);
             return RequestSendResult.of(requestSent, remainingBackoffMs);
         }
 
-        if (requestManager.isReady(destination, currentTimeMs)) {
+        if (requestManager.isReady(destination, currentTimeMs, api)) {

Review Comment:
   Is this true when the request is FETCH or FETCH_SNAPSHOT? Don't we need to 
make sure that neither FETCH nor FETCH_SNAPSHOT is pending?
   
   I am wondering if this grouping of FETCH and FETCH_SNAPSHOT should be 
encoded in the request manager.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2762,25 +2767,27 @@ private void handleInboundMessage(RaftMessage message, 
long currentTimeMs) {
      * @param currentTimeMs the current time
      * @param destination the node receiving the request
      * @param requestSupplier the function that creates the request
+     * @param api the type of request to maybe send
      * @return the first element in the pair indicates if the request was 
sent; the second element
      *         in the pair indicates the time to wait before retrying.
      */
     private RequestSendResult maybeSendRequest(
         long currentTimeMs,
         Node destination,
-        Supplier<ApiMessage> requestSupplier
+        Supplier<ApiMessage> requestSupplier,
+        ApiKeys api

Review Comment:
   How about merging these two states into one object? E.g.
   ```java
   final class RequestSupplier {
       RequestSupplier(Supplier<ApiMesage> supplier, ApiKeys apiKey);
   
       public ApiMessage request();
       public ApiKeys apiKey();
   }
   ```
   
   You can add a check to `RequestSupplier#request` so that is checks that the 
resulting message matches the apiKey.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to