dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447124222
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
))
}
+ /**
+ * Verify the transaction.
+ *
+ * @param tp The partition to write records to.
+ * @param transactionalId The transactional id.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an exception.
+ * @throws KafkaException Any KafkaException caught during the operation.
+ */
+ override def maybeStartTransactionVerification(
+ tp: TopicPartition,
+ transactionalId: String,
+ producerId: Long,
+ producerEpoch: Short
+ ): CompletableFuture[VerificationGuard] = {
+ val future = new CompletableFuture[VerificationGuard]()
+ replicaManager.maybeStartTransactionVerificationForPartition(
+ topicPartition = tp,
+ transactionalId = transactionalId,
+ producerId = producerId,
+ producerEpoch = producerEpoch,
+ baseSequence = RecordBatch.NO_SEQUENCE,
+ requestLocal = RequestLocal.NoCaching,
+ callback = (error, _, verificationGuard) => {
+ if (error != Errors.NONE) {
+ future.completeExceptionally(error.exception)
+ } else {
+ future.complete(verificationGuard)
+ }
+ }
+ )
+ future
+ }
+
private def internalAppend(
tp: TopicPartition,
- memoryRecords: MemoryRecords
+ memoryRecords: MemoryRecords,
+ verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
): Long = {
var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
- replicaManager.appendRecords(
+ replicaManager.appendForGroup(
timeout = 0L,
requiredAcks = 1,
- internalTopicsAllowed = true,
- origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> memoryRecords),
responseCallback = results => appendResults = results,
+ requestLocal = RequestLocal.NoCaching,
Review Comment:
> Right now though, wrap only schedules to the request thread if we are
already on a request thread. Otherwise we execute directly. If we start
verification from a non-request handler thread, maybe this already works as you
intend.
My understanding is that `wrapAsyncCallback` will throw an exception if the
caller is not a request thread and `bypassThreadCheck` is not set.
In my case, the verification is called from the request thread. I only
re-schedule to the coordinator thread when the verification returns. I could
also have scheduled the verification from the coordinator thread but it does
not seem necessary.
> Alternatively, I could pass in a parameter to optionally wrap the callback
(send it to the request thread) or not.
Yeah, that's an option. I played a bit with this idea this morning and the
outcome was a bit confusing. Another option would be to do the re-scheduling on
the caller side. I will continue to play... I think that we could address this
in a subsequent PR to get rid of the extra hop. This PR works as-is but is
suboptimal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]