jeffkbkim commented on code in PR #12901:
URL: https://github.com/apache/kafka/pull/12901#discussion_r1068928367
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2464,90 +2464,108 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleTxnOffsetCommitRequest(request: RequestChannel.Request,
requestLocal: RequestLocal): Unit = {
+ def handleTxnOffsetCommitRequest(
+ request: RequestChannel.Request,
+ requestLocal: RequestLocal
+ ): CompletableFuture[Unit] = {
ensureInterBrokerVersion(IBP_0_11_0_IV0)
- val header = request.header
val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
+ def sendResponse(response: TxnOffsetCommitResponse): Unit = {
+ // We need to replace COORDINATOR_LOAD_IN_PROGRESS with
COORDINATOR_NOT_AVAILABLE
+ // for older producer client from 0.11 to prior 2.0, which could
potentially crash due
+ // to unexpected loading error. This bug is fixed later by KAFKA-7296.
Clients using
+ // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to
have
+ // the fix to check for the loading error.
+ if (txnOffsetCommitRequest.version < 2) {
+ response.data.topics.forEach { topic =>
+ topic.partitions.forEach { partition =>
+ if (partition.errorCode ==
Errors.COORDINATOR_LOAD_IN_PROGRESS.code) {
+ partition.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+ }
+ }
+ }
+ }
+
+ requestHelper.sendMaybeThrottle(request, response)
+ }
+
// authorize for the transactionalId and the consumer group. Note that we
skip producerId authorization
// since it is implied by transactionalId authorization
- if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID,
txnOffsetCommitRequest.data.transactionalId))
- requestHelper.sendErrorResponseMaybeThrottle(request,
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
- else if (!authHelper.authorize(request.context, READ, GROUP,
txnOffsetCommitRequest.data.groupId))
- requestHelper.sendErrorResponseMaybeThrottle(request,
Errors.GROUP_AUTHORIZATION_FAILED.exception)
- else {
- val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
- val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
- val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition,
TxnOffsetCommitRequest.CommittedOffset]()
- val committedOffsets = txnOffsetCommitRequest.offsets.asScala
- val authorizedTopics = authHelper.filterByAuthorized(request.context,
READ, TOPIC, committedOffsets)(_._1.topic)
-
- for ((topicPartition, commitedOffset) <- committedOffsets) {
- if (!authorizedTopics.contains(topicPartition.topic))
- unauthorizedTopicErrors += topicPartition ->
Errors.TOPIC_AUTHORIZATION_FAILED
- else if (!metadataCache.contains(topicPartition))
- nonExistingTopicErrors += topicPartition ->
Errors.UNKNOWN_TOPIC_OR_PARTITION
- else
- authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset)
- }
+ if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID,
txnOffsetCommitRequest.data.transactionalId)) {
+
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
txnOffsetCommitRequest.data.groupId)) {
+
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ val authorizedTopics = authHelper.filterByAuthorized(
+ request.context,
+ READ,
+ TOPIC,
+ txnOffsetCommitRequest.data.topics.asScala
+ )(_.name)
+
+ val responseBuilder = new TxnOffsetCommitResponse.Builder()
+ val authorizedTopicCommittedOffsets = new
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
+ txnOffsetCommitRequest.data.topics.forEach { topic =>
+ if (!authorizedTopics.contains(topic.name)) {
+ // If the topic is not authorized, we add the topic and all its
partitions
+ // to the response with TOPIC_AUTHORIZATION_FAILED.
+
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
+ topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ } else if (!metadataCache.contains(topic.name)) {
+ // If the topic is unknown, we add the topic and all its partitions
+ // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
+ topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ } else {
+ // Otherwise, we check all partitions to ensure that they all exist.
+ val topicWithValidPartitions = new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
- // the callback for sending an offset commit response
- def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition,
Errors]): Unit = {
- val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++=
unauthorizedTopicErrors ++= nonExistingTopicErrors
- if (isDebugEnabled)
- combinedCommitStatus.forKeyValue { (topicPartition, error) =>
- if (error != Errors.NONE) {
- debug(s"TxnOffsetCommit with correlation id
${header.correlationId} from client ${header.clientId} " +
- s"on partition $topicPartition failed due to
${error.exceptionName}")
+ topic.partitions.forEach { partition =>
+ if (metadataCache.getPartitionInfo(topic.name,
partition.partitionIndex).nonEmpty) {
+ topicWithValidPartitions.partitions.add(partition)
+ } else {
+ responseBuilder.addPartition(topic.name,
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}
- // We need to replace COORDINATOR_LOAD_IN_PROGRESS with
COORDINATOR_NOT_AVAILABLE
- // for older producer client from 0.11 to prior 2.0, which could
potentially crash due
- // to unexpected loading error. This bug is fixed later by KAFKA-7296.
Clients using
- // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed
to have
- // the fix to check for the loading error.
- if (txnOffsetCommitRequest.version < 2) {
- combinedCommitStatus ++= combinedCommitStatus.collect {
- case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS
=> tp -> Errors.COORDINATOR_NOT_AVAILABLE
+ if (!topicWithValidPartitions.partitions.isEmpty) {
+ authorizedTopicCommittedOffsets += topicWithValidPartitions
}
}
-
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new TxnOffsetCommitResponse(requestThrottleMs,
combinedCommitStatus.asJava))
}
- if (authorizedTopicCommittedOffsets.isEmpty)
- sendResponseCallback(Map.empty)
- else {
- val offsetMetadata =
convertTxnOffsets(authorizedTopicCommittedOffsets.toMap)
- groupCoordinator.handleTxnCommitOffsets(
- txnOffsetCommitRequest.data.groupId,
- txnOffsetCommitRequest.data.producerId,
- txnOffsetCommitRequest.data.producerEpoch,
- txnOffsetCommitRequest.data.memberId,
- Option(txnOffsetCommitRequest.data.groupInstanceId),
- txnOffsetCommitRequest.data.generationId,
- offsetMetadata,
- sendResponseCallback,
- requestLocal)
+ if (authorizedTopicCommittedOffsets.isEmpty) {
+ sendResponse(responseBuilder.build())
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData()
Review Comment:
i agree, it doesn't seem ideal. what's the reason for not reusing the
original request data and applying `setTopics()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]