chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1697502447
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4069,6 +4431,211 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + private def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest : ShareFetchRequest, + topicIdNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + ) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareFetchRequest.data().topics().forEach ( topic => { + + if(!topicIdNames.containsKey(topic.topicId)) { + topic.partitions.forEach((partition: ShareFetchRequestData.FetchPartition) => { + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + }) + } + else { + topic.partitions().forEach ( partition => { + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicIdNames.get(topic.topicId()), partition.partitionIndex()) + ) + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + partition.acknowledgementBatches().forEach( batch => { + acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() + )) + }) + acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches + }) + } + }) + acknowledgeBatchesMap + } + + def validateAcknowledgementBatches( + acknowledgementDataFromRequest: mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]], + erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] + ): mutable.Set[TopicIdPartition] = { + val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = mutable.Set.empty[TopicIdPartition] + + acknowledgementDataFromRequest.foreach { case (tp: TopicIdPartition, acknowledgeBatches: util.List[ShareAcknowledgementBatch]) => + var prevEndOffset = -1L + var isErroneous = false + acknowledgeBatches.forEach { batch => + if (!isErroneous) { + if (batch.firstOffset > batch.lastOffset) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.firstOffset < prevEndOffset) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.acknowledgeTypes == null || batch.acknowledgeTypes.isEmpty) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.acknowledgeTypes.size() > 1 && batch.lastOffset - batch.firstOffset != batch.acknowledgeTypes.size() - 1) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType < 0 || ackType > 3)) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else { + prevEndOffset = batch.lastOffset + } + } + } + } + + erroneousTopicIdPartitions + } + + // the callback for processing a share fetch response, invoked before throttling. + def processShareFetchResponse( + responsePartitionData: mutable.Map[TopicIdPartition, ShareFetchResponseData.PartitionData], + request: RequestChannel.Request, + topicIdNames : util.Map[Uuid, String], + shareFetchContext : ShareFetchContext + ): ShareFetchResponse = { + + val clientId = request.header.clientId + val versionId = request.header.apiVersion + val shareFetchRequest = request.body[ShareFetchRequest] + val groupId = shareFetchRequest.data.groupId + val memberId = shareFetchRequest.data.memberId + + val partitions = new util.LinkedHashMap[TopicIdPartition, ShareFetchResponseData.PartitionData] + val nodeEndpoints = new mutable.HashMap[Int, Node] + responsePartitionData.foreach { case(tp, partitionData) => + partitionData.errorCode match { + case errCode if errCode == Errors.NOT_LEADER_OR_FOLLOWER.code | errCode == Errors.FENCED_LEADER_EPOCH.code => + val leaderNode = getCurrentLeader(tp.topicPartition, request.context.listenerName) + leaderNode.node.foreach { node => + nodeEndpoints.put(node.id, node) + } + partitionData.currentLeader + .setLeaderId(leaderNode.leaderId) + .setLeaderEpoch(leaderNode.leaderEpoch) + case _ => + } + + partitions.put(tp, partitionData) Review Comment: Thanks for the review. Yes we could use the same, but the definition of some methods of shareFetchContext require a util.LinkedHashMap, so we would anyways require a new variable to store the converted map as it is required as an argument to multiple methods. Talking about why do we need a util.LinkedHashMap altogether, maybe we could change those method signatures to use a scala map as well, but I think that would out of scope for this PR as it would include making changes to others code as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org