jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r2298430334
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -762,167 +763,123 @@ class ReplicaManager(val config: KafkaConfig, delayedProduceLock: Option[Lock] = None, recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, - transactionalId: String = null, - actionQueue: ActionQueue = this.defaultActionQueue): Unit = { - if (isValidRequiredAcks(requiredAcks)) { - - val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = mutable.Map[TopicPartition, VerificationGuard]() - val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, errorsPerPartition) = - if (transactionalId == null || !config.transactionPartitionVerificationEnable) - (entriesPerPartition, Map.empty[TopicPartition, MemoryRecords], Map.empty[TopicPartition, Errors]) - else { - val verifiedEntries = mutable.Map[TopicPartition, MemoryRecords]() - val unverifiedEntries = mutable.Map[TopicPartition, MemoryRecords]() - val errorEntries = mutable.Map[TopicPartition, Errors]() - partitionEntriesForVerification(verificationGuards, entriesPerPartition, verifiedEntries, unverifiedEntries, errorEntries) - (verifiedEntries.toMap, unverifiedEntries.toMap, errorEntries.toMap) - } - - if (notYetVerifiedEntriesPerPartition.isEmpty || addPartitionsToTxnManager.isEmpty) { - appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap, - errorsPerPartition, recordValidationStatsCallback, timeout, responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty) - } else { - // For unverified entries, send a request to verify. When verified, the append process will proceed via the callback. - // We verify above that all partitions use the same producer ID. - val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch() - addPartitionsToTxnManager.foreach(_.verifyTransaction( - transactionalId = transactionalId, - producerId = batchInfo.producerId, - producerEpoch = batchInfo.producerEpoch, - topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq, - callback = KafkaRequestHandler.wrapAsyncCallback( - appendEntries( - entriesPerPartition, - internalTopicsAllowed, - origin, - requiredAcks, - verificationGuards.toMap, - errorsPerPartition, - recordValidationStatsCallback, - timeout, - responseCallback, - delayedProduceLock, - actionQueue - ), - requestLocal) - )) - } - } else { - // If required.acks is outside accepted range, something is wrong with the client - // Just return an error and don't handle the request at all - val responseStatus = entriesPerPartition.map { case (topicPartition, _) => - topicPartition -> new PartitionResponse( - Errors.INVALID_REQUIRED_ACKS, - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, - RecordBatch.NO_TIMESTAMP, - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset - ) - } - responseCallback(responseStatus) + actionQueue: ActionQueue = this.defaultActionQueue, + verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { + if (!isValidRequiredAcks(requiredAcks)) { + sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback) + return } - } - /* - * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal - * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. - */ - private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], - internalTopicsAllowed: Boolean, - origin: AppendOrigin, - requiredAcks: Short, - verificationGuards: Map[TopicPartition, VerificationGuard], - errorsPerPartition: Map[TopicPartition, Errors], - recordConversionStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit, - timeout: Long, - responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Lock], - actionQueue: ActionQueue) - (requestLocal: RequestLocal, unverifiedEntries: Map[TopicPartition, Errors]): Unit = { val sTime = time.milliseconds - val verifiedEntries = - if (unverifiedEntries.isEmpty) - allEntries - else - allEntries.filter { case (tp, _) => - !unverifiedEntries.contains(tp) - } - val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, - origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) + origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - val errorResults = (unverifiedEntries ++ errorsPerPartition).map { - case (topicPartition, error) => - // translate transaction coordinator errors to known producer response errors - val customException = - error match { - case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) - case Errors.CONCURRENT_TRANSACTIONS | - Errors.COORDINATOR_LOAD_IN_PROGRESS | - Errors.COORDINATOR_NOT_AVAILABLE | - Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( - s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) - case _ => None - } - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(customException.getOrElse(error.exception)), - hasCustomErrorMessage = customException.isDefined - ) - } - - val allResults = localProduceResults ++ errorResults - val produceStatus = buildProducePartitionStatus(allResults) + val produceStatus = buildProducePartitionStatus(localProduceResults) - addCompletePurgatoryAction(actionQueue, allResults) - recordConversionStatsCallback(localProduceResults.map { case (k, v) => + addCompletePurgatoryAction(actionQueue, localProduceResults) + recordValidationStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats }) maybeAddDelayedProduce( requiredAcks, delayedProduceLock, timeout, - allEntries, - allResults, + entriesPerPartition, + localProduceResults, produceStatus, responseCallback ) } - private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard], - entriesPerPartition: Map[TopicPartition, MemoryRecords], - verifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - errorEntries: mutable.Map[TopicPartition, Errors]): Unit = { - val transactionalProducerIds = mutable.HashSet[Long]() + /** + * Handles the produce request by starting any transactional verification before appending. + * + * @param timeout maximum time we will wait to append before returning + * @param requiredAcks number of replicas who must acknowledge the append before sending the response + * @param internalTopicsAllowed boolean indicating whether internal topics can be appended to + * @param transactionalId the transactional ID for the produce request or null if there is none. + * @param entriesPerPartition the records per partition to be appended + * @param responseCallback callback for sending the response + * @param recordValidationStatsCallback callback for updating stats on record conversions + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method + * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + */ + def handleProduceAppend(timeout: Long, + requiredAcks: Short, + internalTopicsAllowed: Boolean, + transactionalId: String, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit, + recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), + requestLocal: RequestLocal = RequestLocal.NoCaching, + actionQueue: ActionQueue = this.defaultActionQueue): Unit = { + + val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() + val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() entriesPerPartition.foreach { case (topicPartition, records) => - try { - // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. - val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) - transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId)) - - if (transactionalBatches.nonEmpty) { - // We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify. - val firstBatch = records.firstBatch - val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch) - if (verificationGuard != VerificationGuard.SENTINEL) { - verificationGuards.put(topicPartition, verificationGuard) - unverifiedEntries.put(topicPartition, records) - } else - verifiedEntries.put(topicPartition, records) - } else { - // If there is no producer ID or transactional records in the batches, no need to verify. - verifiedEntries.put(topicPartition, records) - } - } catch { - case e: Exception => errorEntries.put(topicPartition, Errors.forException(e)) - } + // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. + val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) + transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) + if (!transactionalBatches.isEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) } - // We should have exactly one producer ID for transactional records - if (transactionalProducerIds.size > 1) { + if (transactionalProducerInfo.size > 1) { throw new InvalidPidMappingException("Transactional records contained more than one producer ID") } + + def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors], + newRequestLocal: RequestLocal, + verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { + val errorResults = preAppendErrors.map { + case (topicPartition, error) => + // translate transaction coordinator errors to known producer response errors + val customException = + error match { + case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) + case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( + s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) + case _ => None + } + topicPartition -> LogAppendResult( + LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, + Some(customException.getOrElse(error.exception)), + hasCustomErrorMessage = customException.isDefined + ) + } + val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) } Review Comment: Yeah, we could probably only do this filtering if the errorResults is nonEmpty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org