artemlivshits commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1451074358


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
    * @param producerId      the producer id for the producer writing to the 
transaction
    * @param producerEpoch   the epoch of the producer writing to the 
transaction
    * @param baseSequence    the base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocal    container for the stateful instances scoped to 
this request -- this must correspond to the
-   *                        thread calling this method
    * @param callback        the method to execute once the verification is 
either completed or returns an error
    *
    * When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
    * If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
    */
   def maybeStartTransactionVerificationForPartition(
     topicPartition: TopicPartition,
     transactionalId: String,
     producerId: Long,
     producerEpoch: Short,
     baseSequence: Int,
-    requestLocal: RequestLocal,
-    callback: (Errors, RequestLocal, VerificationGuard) => Unit
+    callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-    def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-                            newRequestLocal: RequestLocal,
-                            verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-      callback(
-        preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-        newRequestLocal,
-        verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+    def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {
+      callback(results.getOrElse(topicPartition, 
Right(VerificationGuard.SENTINEL)))

Review Comment:
   This logic is just a translation from the current implementation (so it's 
not introducing anything new), but is it expected that we don't get the results 
for the requested topicPartition?  Should we log a warning, so that we know 
that we're hitting some unexpected code path?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
    * @param producerId      the producer id for the producer writing to the 
transaction
    * @param producerEpoch   the epoch of the producer writing to the 
transaction
    * @param baseSequence    the base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocal    container for the stateful instances scoped to 
this request -- this must correspond to the
-   *                        thread calling this method
    * @param callback        the method to execute once the verification is 
either completed or returns an error
    *
    * When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
    * If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
    */
   def maybeStartTransactionVerificationForPartition(
     topicPartition: TopicPartition,
     transactionalId: String,
     producerId: Long,
     producerEpoch: Short,
     baseSequence: Int,
-    requestLocal: RequestLocal,
-    callback: (Errors, RequestLocal, VerificationGuard) => Unit
+    callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-    def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-                            newRequestLocal: RequestLocal,
-                            verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-      callback(
-        preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-        newRequestLocal,
-        verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+    def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {

Review Comment:
   I think we could do the translation from preAppendErrors, newRequestLocal, 
verificationGuards here, then we'd avoid propagating the changes all the way to 
replication layer.



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -935,8 +935,9 @@ private[group] class GroupCoordinator(
           producerId,
           producerEpoch,
           RecordBatch.NO_SEQUENCE,
-          requestLocal,
-          postVerificationCallback
+          // Wrap the callback to be handled on an arbitrary request handler 
thread
+          // when transaction verification is complete.
+          KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, 
requestLocal)

Review Comment:
   It's interesting to note (I don't think we need to change anything) is that 
now we'll have a production code path (and not just unit test) where we can 
call wrapped callback on the same request thread and we'll go through the 
optimized code path where we call the callback directly.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1132,36 +1123,31 @@ class ReplicaManager(val config: KafkaConfig,
    * @param transactionalId          the transactional id for the transaction
    * @param producerId               the producer id for the producer writing 
to the transaction
    * @param producerEpoch            the epoch of the producer writing to the 
transaction
-   * @param requestLocal             container for the stateful instances 
scoped to this request -- this must correspond to the
-   *                                 thread calling this method
    * @param callback                 the method to execute once the 
verification is either completed or returns an error
    *
    * When the verification returns, the callback will be supplied the errors 
per topic partition if there were errors.
    * The callback will also be supplied the verification guards per partition 
if they exist. It is possible to have an
    * error and a verification guard for a topic partition if the topic 
partition was unable to be verified by the transaction
-   * coordinator. Transaction coordinator errors are mapped to append-friendly 
errors. The callback is wrapped so that it
-   * is scheduled on a request handler thread. There, it should be called with 
that request handler thread's thread local and
-   * not the one supplied to this method.
+   * coordinator. Transaction coordinator errors are mapped to append-friendly 
errors.
    */
   def maybeStartTransactionVerificationForPartitions(
     topicPartitionBatchInfo: Map[TopicPartition, Int],
     transactionalId: String,
     producerId: Long,
     producerEpoch: Short,
-    requestLocal: RequestLocal,
-    callback: (Map[TopicPartition, Errors], RequestLocal, Map[TopicPartition, 
VerificationGuard]) => Unit
+    callback: mutable.Map[TopicPartition, Either[Errors, VerificationGuard]] 
=> Unit

Review Comment:
   My understanding, that once we refactor these changes, this function could 
be either called from GC code path (that may not care about requestLocal) or 
from the core data path, that needs requestLocal, because the callback may be 
called immediately in this thread context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to