kevin-wu24 commented on code in PR #21619:
URL: https://github.com/apache/kafka/pull/21619#discussion_r2880308894


##########
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##########
@@ -267,6 +267,7 @@ class ControllerRegistrationManager(
     }
 
     override def onTimeout(): Unit = {
+      pendingRpc = false

Review Comment:
   > How about the response sent to the broker? The response sent to the broker 
is important since that is what stops the broker from rescheduling registration 
requests.
   
   The controller who is registering looks for its 
`ControllerRegistrationRecord` in the metadata log to stop rescheduling 
requests. The `ControllerRegistrationManager` is a `MetadataPublisher`, and its 
`onMetadataUpdate` will append an event. After the registering controller sees 
its `ControllerRegistrationRecord` in the metadata log, it will stop sending 
requests. When the registering controller receives a response from the active 
controller, it logs a message, and updates some internal state. Importantly, it 
does not schedule another request. Instead, upon receiving a metadata update, 
that is when the registering controller re-enters 
`maybeSendControllerRegistration`.
   
   > I was trying to affirm that if onTimeout is called, it means that the 
request was not sent and will not be sent.
   
   Yeah, I see in `NodeToControllerRequestThread#generateRequests`, we call 
`request.callback().onTimeout()` instead of adding the request to 
`InterBrokerSendThread#unsentRequests`. This means that the registration 
request is not sent and will not be sent over the wire when this callback is 
invoked.
   
   As for the case where the request is sent over the wire because we have an 
established connection (i.e. we execute the code in 
`InterBrokerSendThread#sendRequests`, but no response is received before the 
request times out), I see this code in the `NetworkClient`:
   ```
   private void handleTimedOutRequests(List<ClientResponse> responses, long 
now) {
           List<String> nodeIds = 
this.inFlightRequests.nodesWithTimedOutRequests(now);
           for (String nodeId : nodeIds) {
               // close connection to the node
               this.selector.close(nodeId);
               log.info("Disconnecting from node {} due to request timeout.", 
nodeId);
               processTimeoutDisconnection(responses, nodeId, now);
           }
       }
   ```
   To me, the code trace looks something like the following:
   ```
   NetworkClient#processDisconnection(responses = empty ArrayList, nodeId, now, 
ChannelState.LOCAL_CLOSE, timedOut = true);
   ...
   in NetworkClient#cancelInFlightRequests(nodeId, now, responses, timedOut = 
true):
   if (timedOut)
                           clientResponse = request.timedOut(now);
   responses.add(clientResponse);
   ...
   in NodeToControllerRequesetThread#handleResponse:
   else {
               queueItem.callback().onComplete(response);
           }
   ...
   in ControllerRegistrationManager.RegistrationResponseHandler#onComplete:
   else if (response.responseBody() == null) {
           error(s"RegistrationResponseHandler: unknown error")
           scheduleNextCommunicationAfterFailure()
   ```
   Importantly, we do not call 
`NodeToControllerRequestThread#maybeDisconnectAndUpdateController` when 
handling the timed out response in this case. This means the next scheduled 
`ControllerRegistrationRequest` would go to the same controller endpoint. I am 
not sure if this is necessarily an issue though... (if we can establish a 
connection with a node, the assumption is we can complete a RPC to that node 
right?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to