artemlivshits commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1451074358
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId the producer id for the producer writing to the
transaction
* @param producerEpoch the epoch of the producer writing to the
transaction
* @param baseSequence the base sequence of the first record in the batch
we are trying to append
- * @param requestLocal container for the stateful instances scoped to
this request -- this must correspond to the
- * thread calling this method
* @param callback the method to execute once the verification is
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the
SENTINEL verification guard will be returned.
- * This guard can not be used for verification and any appends that attenpt
to use it will fail.
+ * This guard can not be used for verification and any appends that attempt
to use it will fail.
*/
def maybeStartTransactionVerificationForPartition(
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
producerEpoch: Short,
baseSequence: Int,
- requestLocal: RequestLocal,
- callback: (Errors, RequestLocal, VerificationGuard) => Unit
+ callback: Either[Errors, VerificationGuard] => Unit
): Unit = {
- def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
- newRequestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition,
VerificationGuard]): Unit = {
- callback(
- preAppendErrors.getOrElse(topicPartition, Errors.NONE),
- newRequestLocal,
- verificationGuards.getOrElse(topicPartition,
VerificationGuard.SENTINEL))
+ def generalizedCallback(results: Map[TopicPartition, Either[Errors,
VerificationGuard]]): Unit = {
+ callback(results.getOrElse(topicPartition,
Right(VerificationGuard.SENTINEL)))
Review Comment:
This logic is just a translation from the current implementation (so it's
not introducing anything new), but is it expected that we don't get the results
for the requested topicPartition? Should we log a warning, so that we know
that we're hitting some unexpected code path?
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId the producer id for the producer writing to the
transaction
* @param producerEpoch the epoch of the producer writing to the
transaction
* @param baseSequence the base sequence of the first record in the batch
we are trying to append
- * @param requestLocal container for the stateful instances scoped to
this request -- this must correspond to the
- * thread calling this method
* @param callback the method to execute once the verification is
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the
SENTINEL verification guard will be returned.
- * This guard can not be used for verification and any appends that attenpt
to use it will fail.
+ * This guard can not be used for verification and any appends that attempt
to use it will fail.
*/
def maybeStartTransactionVerificationForPartition(
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
producerEpoch: Short,
baseSequence: Int,
- requestLocal: RequestLocal,
- callback: (Errors, RequestLocal, VerificationGuard) => Unit
+ callback: Either[Errors, VerificationGuard] => Unit
): Unit = {
- def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
- newRequestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition,
VerificationGuard]): Unit = {
- callback(
- preAppendErrors.getOrElse(topicPartition, Errors.NONE),
- newRequestLocal,
- verificationGuards.getOrElse(topicPartition,
VerificationGuard.SENTINEL))
+ def generalizedCallback(results: Map[TopicPartition, Either[Errors,
VerificationGuard]]): Unit = {
Review Comment:
I think we could do the translation from preAppendErrors, newRequestLocal,
verificationGuards here, then we'd avoid propagating the changes all the way to
replication layer.
##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -935,8 +935,9 @@ private[group] class GroupCoordinator(
producerId,
producerEpoch,
RecordBatch.NO_SEQUENCE,
- requestLocal,
- postVerificationCallback
+ // Wrap the callback to be handled on an arbitrary request handler
thread
+ // when transaction verification is complete.
+ KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback,
requestLocal)
Review Comment:
It's interesting to note (I don't think we need to change anything) is that
now we'll have a production code path (and not just unit test) where we can
call wrapped callback on the same request thread and we'll go through the
optimized code path where we call the callback directly.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1132,36 +1123,31 @@ class ReplicaManager(val config: KafkaConfig,
* @param transactionalId the transactional id for the transaction
* @param producerId the producer id for the producer writing
to the transaction
* @param producerEpoch the epoch of the producer writing to the
transaction
- * @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
- * thread calling this method
* @param callback the method to execute once the
verification is either completed or returns an error
*
* When the verification returns, the callback will be supplied the errors
per topic partition if there were errors.
* The callback will also be supplied the verification guards per partition
if they exist. It is possible to have an
* error and a verification guard for a topic partition if the topic
partition was unable to be verified by the transaction
- * coordinator. Transaction coordinator errors are mapped to append-friendly
errors. The callback is wrapped so that it
- * is scheduled on a request handler thread. There, it should be called with
that request handler thread's thread local and
- * not the one supplied to this method.
+ * coordinator. Transaction coordinator errors are mapped to append-friendly
errors.
*/
def maybeStartTransactionVerificationForPartitions(
topicPartitionBatchInfo: Map[TopicPartition, Int],
transactionalId: String,
producerId: Long,
producerEpoch: Short,
- requestLocal: RequestLocal,
- callback: (Map[TopicPartition, Errors], RequestLocal, Map[TopicPartition,
VerificationGuard]) => Unit
+ callback: mutable.Map[TopicPartition, Either[Errors, VerificationGuard]]
=> Unit
Review Comment:
My understanding, that once we refactor these changes, this function could
be either called from GC code path (that may not care about requestLocal) or
from the core data path, that needs requestLocal, because the callback may be
called immediately in this thread context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]