artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1147156380
########## core/src/main/scala/kafka/network/RequestChannel.scala: ########## @@ -456,9 +460,16 @@ class RequestChannel(val queueSize: Int, } } - /** Get the next request or block until specified time has elapsed */ - def receiveRequest(timeout: Long): RequestChannel.BaseRequest = - requestQueue.poll(timeout, TimeUnit.MILLISECONDS) + /** Get the next request or block until specified time has elapsed + * Check the callback queue and execute first if present since these + * requests have already waited in line. */ + def receiveRequest(timeout: Long): RequestChannel.BaseRequest = { + val callbackRequest = callbackQueue.poll() + if (callbackRequest != null) + callbackRequest + else + requestQueue.poll(timeout, TimeUnit.MILLISECONDS) Review Comment: We could probably handle WakeupRequest in this function, so that the wakeup mechanism is encapsulated in RequestChannel (i.e. check if we got a wakeup request from the requestQueue and poll the callbackQueue again in that case). ########## core/src/main/scala/kafka/network/RequestChannel.scala: ########## @@ -481,6 +493,11 @@ class RequestChannel(val queueSize: Int, def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) + def sendCallbackRequest(request: CallbackRequest): Unit = { + callbackQueue.put(request) + requestQueue.put(RequestChannel.WakeupRequest) Review Comment: This should be .offer -- we don't need to block if the request queue is full, and it's ok if we don't have a wakeup request in a full queue -- the queue would would contain a request (due to the fact that it's full) to wake up the poll. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org