chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1666735395
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3955,11 +3960,473 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") + } + + val groupId = shareFetchRequest.data.groupId + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data.topics.forEach ( topic => { + breakable{ + topic.partitions.forEach ( partition => { + if (partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) { + isAcknowledgeDataPresent = true + break + } else { + isAcknowledgeDataPresent = false + } + }) + } + }) + isAcknowledgeDataPresent + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() + + def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data. + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { + return true + } + false + } + + val topicNames = metadataCache.topicIdsToNames() + val shareFetchData = shareFetchRequest.shareFetchData(topicNames) + val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) + + val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) + var shareFetchContext : ShareFetchContext = null + + var shareFetchResponse : ShareFetchResponse = null + + def updateConversionStats(send: Send): Unit = { + send match { + case send: MultiRecordsSend if send.recordConversionStats != null => + send.recordConversionStats.asScala.toMap.foreach { + case (tp, stats) => updateRecordConversionStats(request, tp, stats) + } + case send: NetworkSend => + updateConversionStats(send.send()) + case _ => + } + } + + // check if the Request is Invalid. If it is, the request is failed directly here. + if(isInvalidShareFetchRequest()) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + + try { + // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) + } catch { + case e: Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) + CompletableFuture.completedFuture[Unit](()) + return + } + + // Variable to store any error thrown while the handling piggybacked acknowledgements. + var acknowledgeError : Errors = Errors.NONE + // Variable to store the topic partition wise result of piggybacked acknowledgements. + var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + + val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions + val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set() + erroneousAndValidPartitionData.erroneous.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + shareFetchData.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + topicIdPartitionSeq + )(_.topicPartition.topic) + + // Handling the Acknowledgements from the ShareFetchRequest If this check is true, we are sure that this is not an + // Initial ShareFetch Request, otherwise the request would have been invalid. + if(isAcknowledgeDataPresent) { + if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + acknowledgeError = Errors.GROUP_AUTHORIZATION_FAILED + } else { + acknowledgeResult = handleAcknowledgements(request, topicNames, sharePartitionManager, authorizedTopics, groupId, memberId, true) + } + } + + // Handling the Fetch from the ShareFetchRequest. + try { + shareFetchResponse = handleFetchFromShareFetchRequest( + request, + erroneousAndValidPartitionData, + topicNames, + sharePartitionManager, + shareFetchContext, + authorizedTopics + ) + } catch { + case throwable : Throwable => + debug(s"Share fetch request with correlation from client ${request.header.clientId} " + + s"failed with error ${throwable.getMessage}") + requestHelper.handleError(request, throwable) + return + } + + def combineShareFetchAndShareAcknowledgeResponses( + shareFetchResponse: ShareFetchResponse, + acknowledgeResult : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + acknowledgeError : Errors + ) : ShareFetchResponse = { + + // The outer map has topicId as the key and the inner map has partitionIndex as the key. + val topicPartitionAcknowledgements : mutable.Map[Uuid, mutable.Map[Int, Short]] = mutable.Map() + if(acknowledgeResult != null && acknowledgeResult.nonEmpty) { + acknowledgeResult.asJava.forEach { (tp, partitionData) => + topicPartitionAcknowledgements.get(tp.topicId) match { + case Some(subMap) => + subMap += tp.partition -> partitionData.errorCode + case None => + val partitionAcknowledgementsMap : mutable.Map[Int, Short] = mutable.Map() + partitionAcknowledgementsMap += tp.partition -> partitionData.errorCode + topicPartitionAcknowledgements += tp.topicId -> partitionAcknowledgementsMap + } + } + } + + shareFetchResponse.data.responses.forEach(topic => { + val topicId = topic.topicId + topicPartitionAcknowledgements.get(topicId) match { + case Some(subMap) => + topic.partitions.forEach { partition => + subMap.get(partition.partitionIndex) match { + case Some(value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + partition.setAcknowledgeErrorCode(ackErrorCode) + // Delete the element. + subMap.remove(partition.partitionIndex) + case None => + } + } + // Add the remaining acknowledgements. + subMap.foreach { case (partitionIndex, value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(ackErrorCode) + topic.partitions.add(fetchPartitionData) + } + topicPartitionAcknowledgements.remove(topicId) + case None => + } + }) + // Add the remaining acknowledgements. + topicPartitionAcknowledgements.foreach{ case(topicId, subMap) => + val topicData = new ShareFetchResponseData.ShareFetchableTopicResponse() + .setTopicId(topicId) + subMap.foreach { case (partitionIndex, value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(ackErrorCode) + topicData.partitions.add(fetchPartitionData) + } + shareFetchResponse.data.responses.add(topicData) + } + + if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + sharePartitionManager.releaseAcquiredRecords(groupId, memberId). + whenComplete((releaseAcquiredRecordsData, throwable) => { + if (throwable != null) { + debug(s"Release acquired records on share session close with correlation from client ${request.header.clientId} " + + s"failed with error ${throwable.getMessage}") + requestHelper.handleError(request, throwable) Review Comment: Yep, this doesn't make sense. If there is an error here, the broker will send out 2 responses for that. I have changed this to the following logic - 1) if throwable is not null, it will simply throw the throwable. 2) I have surrounded the call of `combineShareFetchAndShareAcknowledgeResponses` with a try catch. If error is not thrown then `requestChannel.sendResponse` with appropriate arguments. If error is thrown, then `requestHelper.handleError(request, throwable)` with appropriate arguments. Also change the log from debug to error here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org