junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1381976028


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.

Review Comment:
   expected => is expected



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
    */
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)

Review Comment:
   Looking at this again. I am not sure AsynchronousCompletionCallback adds 
much value. It makes the existing subtle code even harder to understand. So, we 
probably could just go back to the original way, but add the comments here to 
the javadoc for `asyncCompletionCallback` in `wrap`. 



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -50,28 +50,36 @@ object KafkaRequestHandler {
   }
 
   /**
-   * Wrap callback to schedule it on a request thread.
-   * NOTE: this function must be called on a request thread.
-   * @param fun Callback function to execute
-   * @return Wrapped callback that would execute `fun` on a request thread
+   * A callback method that expected to be executed once in an arbitrary 
request handler thread after an asynchronous action completes.
+   * The RequestLocal passed in must belong to the request handler thread that 
is executing the callback.
    */
-  def wrap[T](fun: T => Unit): T => Unit = {
+  class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit)
+
+  /**
+   * Wrap callback to schedule it on an arbitrary request thread.
+   * NOTE: this function must be originally called from a request thread.
+   * @param asyncCompletionCallback a callback function to execute as the 
result of an asynchronous action completing
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   *                     the callback function without queueing the callback 
request
+   * @return Wrapped callback that schedules `asyncCompletionCallback` on an 
arbitrary request thread
+   */
+  def wrap[T](asyncCompletionCallback: AsynchronousCompletionCallback[T], 
requestLocal: RequestLocal): T => Unit = {

Review Comment:
   `wrap` seems unintuitive. How about sth like `executeOrRegisterCallback`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to