chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1666735395


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3955,11 +3960,473 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    } else if (!config.isShareGroupEnabled) {
+      // The API is not supported when the "share" rebalance protocol has not 
been set explicitly.
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+      var isAcknowledgeDataPresent = false
+      shareFetchRequest.data.topics.forEach ( topic => {
+        breakable{
+          topic.partitions.forEach ( partition => {
+            if (partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty) {
+              isAcknowledgeDataPresent = true
+              break
+            } else {
+              isAcknowledgeDataPresent = false
+            }
+          })
+        }
+      })
+      isAcknowledgeDataPresent
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+
+    def isInvalidShareFetchRequest() : Boolean = {
+      // The Initial Share Fetch Request should not Acknowledge any data.
+      if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+        return true
+      }
+      false
+    }
+
+    val topicNames = metadataCache.topicIdsToNames()
+    val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+    val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+    var shareFetchContext : ShareFetchContext = null
+
+    var shareFetchResponse : ShareFetchResponse = null
+
+    def updateConversionStats(send: Send): Unit = {
+      send match {
+        case send: MultiRecordsSend if send.recordConversionStats != null =>
+          send.recordConversionStats.asScala.toMap.foreach {
+            case (tp, stats) => updateRecordConversionStats(request, tp, stats)
+          }
+        case send: NetworkSend =>
+          updateConversionStats(send.send())
+        case _ =>
+      }
+    }
+
+    // check if the Request is Invalid. If it is, the request is failed 
directly here.
+    if(isInvalidShareFetchRequest()) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.INVALID_REQUEST.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    }
+
+    try {
+      // Creating the shareFetchContext for Share Session Handling. if context 
creation fails, the request is failed directly here.
+      shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+    } catch {
+      case e: Exception => requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
+        CompletableFuture.completedFuture[Unit](())
+        return
+    }
+
+    // Variable to store any error thrown while the handling piggybacked 
acknowledgements.
+    var acknowledgeError : Errors = Errors.NONE
+    // Variable to store the topic partition wise result of piggybacked 
acknowledgements.
+    var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+    val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = 
shareFetchContext.getErroneousAndValidTopicIdPartitions
+    val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set()
+    erroneousAndValidPartitionData.erroneous.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    shareFetchData.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+
+    val authorizedTopics = authHelper.filterByAuthorized(
+      request.context,
+      READ,
+      TOPIC,
+      topicIdPartitionSeq
+    )(_.topicPartition.topic)
+
+    // Handling the Acknowledgements from the ShareFetchRequest If this check 
is true, we are sure that this is not an
+    // Initial ShareFetch Request, otherwise the request would have been 
invalid.
+    if(isAcknowledgeDataPresent) {
+      if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+        acknowledgeError = Errors.GROUP_AUTHORIZATION_FAILED
+      } else {
+        acknowledgeResult = handleAcknowledgements(request, topicNames, 
sharePartitionManager, authorizedTopics, groupId, memberId, true)
+      }
+    }
+
+    // Handling the Fetch from the ShareFetchRequest.
+    try {
+      shareFetchResponse = handleFetchFromShareFetchRequest(
+        request,
+        erroneousAndValidPartitionData,
+        topicNames,
+        sharePartitionManager,
+        shareFetchContext,
+        authorizedTopics
+      )
+    } catch {
+      case throwable : Throwable =>
+        debug(s"Share fetch request with correlation from client 
${request.header.clientId}  " +
+          s"failed with error ${throwable.getMessage}")
+        requestHelper.handleError(request, throwable)
+        return
+    }
+
+    def combineShareFetchAndShareAcknowledgeResponses(
+                                                       shareFetchResponse: 
ShareFetchResponse,
+                                                       acknowledgeResult : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                       acknowledgeError : 
Errors
+                                                     ) : ShareFetchResponse = {
+
+      // The outer map has topicId as the key and the inner map has 
partitionIndex as the key.
+      val topicPartitionAcknowledgements : mutable.Map[Uuid, mutable.Map[Int, 
Short]] = mutable.Map()
+      if(acknowledgeResult != null && acknowledgeResult.nonEmpty) {
+        acknowledgeResult.asJava.forEach { (tp, partitionData) =>
+          topicPartitionAcknowledgements.get(tp.topicId) match {
+            case Some(subMap) =>
+              subMap += tp.partition -> partitionData.errorCode
+            case None =>
+              val partitionAcknowledgementsMap : mutable.Map[Int, Short] = 
mutable.Map()
+              partitionAcknowledgementsMap += tp.partition -> 
partitionData.errorCode
+              topicPartitionAcknowledgements += tp.topicId -> 
partitionAcknowledgementsMap
+          }
+        }
+      }
+
+      shareFetchResponse.data.responses.forEach(topic => {
+        val topicId = topic.topicId
+        topicPartitionAcknowledgements.get(topicId) match {
+          case Some(subMap) =>
+            topic.partitions.forEach { partition =>
+              subMap.get(partition.partitionIndex) match {
+                case Some(value) =>
+                  val ackErrorCode = if(acknowledgeError.code != 
Errors.NONE.code) acknowledgeError.code else value
+                  partition.setAcknowledgeErrorCode(ackErrorCode)
+                  // Delete the element.
+                  subMap.remove(partition.partitionIndex)
+                case None =>
+              }
+            }
+            // Add the remaining acknowledgements.
+            subMap.foreach { case (partitionIndex, value) =>
+              val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) 
acknowledgeError.code else value
+              val fetchPartitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(Errors.NONE.code)
+                .setAcknowledgeErrorCode(ackErrorCode)
+              topic.partitions.add(fetchPartitionData)
+            }
+            topicPartitionAcknowledgements.remove(topicId)
+          case None =>
+        }
+      })
+      // Add the remaining acknowledgements.
+      topicPartitionAcknowledgements.foreach{ case(topicId, subMap) =>
+        val topicData = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
+          .setTopicId(topicId)
+        subMap.foreach { case (partitionIndex, value) =>
+          val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) 
acknowledgeError.code else value
+          val fetchPartitionData = new ShareFetchResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(ackErrorCode)
+          topicData.partitions.add(fetchPartitionData)
+        }
+        shareFetchResponse.data.responses.add(topicData)
+      }
+
+      if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+        sharePartitionManager.releaseAcquiredRecords(groupId, memberId).
+          whenComplete((releaseAcquiredRecordsData, throwable) => {
+            if (throwable != null) {
+              debug(s"Release acquired records on share session close with 
correlation from client ${request.header.clientId}  " +
+                s"failed with error ${throwable.getMessage}")
+              requestHelper.handleError(request, throwable)

Review Comment:
   Yep, this doesn't make sense. If there is an error here, the broker will 
send out 2 responses for that. I have changed this to the following logic - 
   1) if throwable is not null, it will simply throw the throwable.
   2) I have surrounded the call of 
`combineShareFetchAndShareAcknowledgeResponses` with a try catch. If error is 
not thrown then `requestChannel.sendResponse` with appropriate arguments. If 
error is thrown, then `requestHelper.handleError(request, throwable)` with 
appropriate arguments.
   
   Also change the log from debug to error here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to