kevin-wu24 commented on code in PR #19668: URL: https://github.com/apache/kafka/pull/19668#discussion_r2114387246
########## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ########## @@ -62,30 +70,35 @@ public RequestManager( } /** - * Returns true if there are any connections with pending requests. + * Returns true if there are any in-flight requests for a request type. * - * This is useful for satisfying the invariant that there is only one pending Fetch request. + * This is useful for satisfying the invariant that there is only one pending Fetch + * and FetchSnapshot request. * If there are more than one pending fetch request, it is possible for the follower to write * the same offset twice. * * @param currentTimeMs the current time - * @return true if the request manager is tracking at least one request + * @param wantRequestKey the request type to check for in-flight requests + * @return true if the request manager is tracking at least one request of the given type */ - public boolean hasAnyInflightRequest(long currentTimeMs) { + public boolean hasAnyInflightRequest(long currentTimeMs, ApiKeys wantRequestKey) { boolean result = false; - Iterator<ConnectionState> iterator = connections.values().iterator(); + final var iterator = inflightRequests.entrySet().iterator(); while (iterator.hasNext()) { - ConnectionState connection = iterator.next(); - if (connection.hasRequestTimedOut(currentTimeMs)) { + final var entry = iterator.next(); + final var requestKey = entry.getKey().requestType().apiKey(); + final var requestState = entry.getValue(); + if (requestState.hasRequestTimedOut(currentTimeMs)) { // Mark the node as ready after request timeout iterator.remove(); - } else if (connection.isBackoffComplete(currentTimeMs)) { + } else if (requestState.isBackoffComplete(currentTimeMs)) { // Mark the node as ready after completed backoff iterator.remove(); - } else if (connection.hasInflightRequest(currentTimeMs)) { + } else if (requestKey == wantRequestKey && Review Comment: Now that I think about this more, I think another way to enforce this is by having the public RequestManager APIs themselves take in `RequestType`, not `ApiKeys` parameter, since `RequestType` is what the manager actually deals with. This is the only method that doesn't take the `ApiKeys` parameter from the caller and convert it to a `RequestType`. Is it better for the caller to "provide" the correct data, or for the implementation correct it itself (i.e. convert from `ApiKeys` to `RequestType` like in the other public methods)? I think the former might be better, because the code won't compile unless the `ApiKeys -> RequestType` conversion occurs, but in the latter case a bug like this can arise. However, there is code that does need the actual `ApiKeys` (i.e. fetch and fetch_snapshot are treated differently), such as the RPC handling code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org