artemlivshits commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1147156380


##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -456,9 +460,16 @@ class RequestChannel(val queueSize: Int,
     }
   }
 
-  /** Get the next request or block until specified time has elapsed */
-  def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
-    requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
+  /** Get the next request or block until specified time has elapsed 
+   *  Check the callback queue and execute first if present since these
+   *  requests have already waited in line. */
+  def receiveRequest(timeout: Long): RequestChannel.BaseRequest = {
+    val callbackRequest = callbackQueue.poll()
+    if (callbackRequest != null)
+      callbackRequest
+    else 
+      requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

Review Comment:
   We could probably handle WakeupRequest in this function, so that the wakeup 
mechanism is encapsulated in RequestChannel (i.e. check if we got a wakeup 
request from the requestQueue and poll the callbackQueue again in that case).



##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -481,6 +493,11 @@ class RequestChannel(val queueSize: Int,
 
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 
+  def sendCallbackRequest(request: CallbackRequest): Unit = {
+    callbackQueue.put(request)
+    requestQueue.put(RequestChannel.WakeupRequest)

Review Comment:
   This should be .offer -- we don't need to block if the request queue is 
full, and it's ok if we don't have a wakeup request in a full queue -- the 
queue would would contain a request (due to the fact that it's full) to wake up 
the poll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to