jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1445444996
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
))
}
+ /**
+ * Verify the transaction.
+ *
+ * @param tp The partition to write records to.
+ * @param transactionalId The transactional id.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an exception.
+ * @throws KafkaException Any KafkaException caught during the operation.
+ */
+ override def maybeStartTransactionVerification(
+ tp: TopicPartition,
+ transactionalId: String,
+ producerId: Long,
+ producerEpoch: Short
+ ): CompletableFuture[VerificationGuard] = {
+ val future = new CompletableFuture[VerificationGuard]()
+ replicaManager.maybeStartTransactionVerificationForPartition(
Review Comment:
One thing to note is that we don't always verify the partition if it has
already been verified for the transaction. I guess in that case we return the
sentinel guard which works 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]