jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1164399008


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
           completeShutdown()
           return
 
+        case callback: RequestChannel.CallbackRequest =>
+          try {
+            val originalRequest = callback.originalRequest
+            
+            // If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+            // new dequeue time. This will allow calculation of multiple 
callback times.
+            // Otherwise, set dequeue time to now.
+            if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
+              val prevCallbacksTimeNanos = 
originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - 
originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
+              originalRequest.callbackRequestCompleteTimeNanos = None
+              originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds() - prevCallbacksTimeNanos)
+            } else {
+              originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds())
+            }
+            
+            currentRequest.set(originalRequest)
+            callback.fun()
+            if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)

Review Comment:
   I'm not sure I follow the first part since right now, we need to account for 
all the callback local times. Unless we change to a cumulative callback total 
time where we periodically add to it, we will need to update either on the send 
OR when the callback completes (if we don't send yet)
   
   I'm also not sure about waiting until the callback completes because that is 
after we send via the network client and that is no longer considered "local 
time"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to