kevin-wu24 commented on code in PR #19668: URL: https://github.com/apache/kafka/pull/19668#discussion_r2114492022
########## raft/src/test/java/org/apache/kafka/raft/RequestManagerTest.java: ########## @@ -234,15 +330,61 @@ public void testFindReadyWithRequestTimedOut() { ); // Send request to a node that is not in the bootstrap list - cache.onRequestSent(otherNode, 1, time.milliseconds()); - assertTrue(cache.isResponseExpected(otherNode, 1)); + cache.onRequestSent(otherNode, 1, time.milliseconds(), fetch); + assertTrue(cache.isResponseExpected(otherNode, 1, fetch)); assertEquals(Optional.empty(), cache.findReadyBootstrapServer(time.milliseconds())); // Timeout the request time.sleep(requestTimeoutMs); Node bootstrapNode = cache.findReadyBootstrapServer(time.milliseconds()).get(); assertTrue(bootstrapList.contains(bootstrapNode)); - assertFalse(cache.isResponseExpected(otherNode, 1)); + assertFalse(cache.isResponseExpected(otherNode, 1, fetch)); + } + + @Test + public void testAnyInflightRequestWithMultipleRequestTypes() { + Node otherNode = new Node(1, "other-node", 1234); + List<Node> bootstrapList = makeBootstrapList(3); + RequestManager cache = new RequestManager( + bootstrapList, + retryBackoffMs, + requestTimeoutMs, + random + ); + + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), updateVoter)); + + // Send a request and check state + cache.onRequestSent(otherNode, 11, time.milliseconds(), fetch); + assertTrue(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), updateVoter)); + + // Send the other request and check state + cache.onRequestSent(otherNode, 11, time.milliseconds(), updateVoter); + assertTrue(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); + assertTrue(cache.hasAnyInflightRequest(time.milliseconds(), updateVoter)); + + // Wait until the request times out + time.sleep(requestTimeoutMs); + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), updateVoter)); + + // Results should not affect the connection state of other request types + cache.onRequestSent(otherNode, 12, time.milliseconds(), updateVoter); + + // Send another request and fail it + cache.onRequestSent(otherNode, 12, time.milliseconds(), fetch); + cache.onResponseResult(otherNode, 12, false, time.milliseconds(), fetch); + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); + assertTrue(cache.hasAnyInflightRequest(time.milliseconds(), updateVoter)); + + // Send fetch snapshot request, it should be treated the same as fetch + cache.onRequestSent(otherNode, 12, time.milliseconds(), fetchSnapshot); + assertTrue(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); Review Comment: Yes, it should when `hasAnyInflightRequest` is fixed. ########## raft/src/test/java/org/apache/kafka/raft/RequestManagerTest.java: ########## @@ -234,15 +330,61 @@ public void testFindReadyWithRequestTimedOut() { ); // Send request to a node that is not in the bootstrap list - cache.onRequestSent(otherNode, 1, time.milliseconds()); - assertTrue(cache.isResponseExpected(otherNode, 1)); + cache.onRequestSent(otherNode, 1, time.milliseconds(), fetch); + assertTrue(cache.isResponseExpected(otherNode, 1, fetch)); assertEquals(Optional.empty(), cache.findReadyBootstrapServer(time.milliseconds())); // Timeout the request time.sleep(requestTimeoutMs); Node bootstrapNode = cache.findReadyBootstrapServer(time.milliseconds()).get(); assertTrue(bootstrapList.contains(bootstrapNode)); - assertFalse(cache.isResponseExpected(otherNode, 1)); + assertFalse(cache.isResponseExpected(otherNode, 1, fetch)); + } + + @Test + public void testAnyInflightRequestWithMultipleRequestTypes() { + Node otherNode = new Node(1, "other-node", 1234); + List<Node> bootstrapList = makeBootstrapList(3); + RequestManager cache = new RequestManager( + bootstrapList, + retryBackoffMs, + requestTimeoutMs, + random + ); + + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); + assertFalse(cache.hasAnyInflightRequest(time.milliseconds(), updateVoter)); + + // Send a request and check state + cache.onRequestSent(otherNode, 11, time.milliseconds(), fetch); + assertTrue(cache.hasAnyInflightRequest(time.milliseconds(), fetch)); Review Comment: same as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org