YutaLin commented on code in PR #19630: URL: https://github.com/apache/kafka/pull/19630#discussion_r2080868631
########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, .setPartition(tp.partition) .setErrorCode(Errors.forException(t).code) } - } + }.asJava } - override def buildFetch(partitions: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { + override def buildFetch(partitions: util.Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = { // Only include replica in the fetch request if it is not throttled. if (quota.isQuotaExceeded) { - ResultWithPartitions(None, Set.empty) + new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), util.Collections.emptySet[TopicPartition]()) } else { - selectPartitionToFetch(partitions) match { - case Some((tp, fetchState)) => - buildFetchForPartition(tp, fetchState) - case None => - ResultWithPartitions(None, Set.empty) + val selectPartition = selectPartitionToFetch(partitions) + if (selectPartition.isPresent) { + val (tp, fetchState) = selectPartition.get() + buildFetchForPartition(tp, fetchState) + } else { + new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), util.Collections.emptySet[TopicPartition]()) } } } - private def selectPartitionToFetch(partitions: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + private def selectPartitionToFetch(partitions: util.Map[TopicPartition, PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = { // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on // moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the // partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it // becomes unavailable or is removed. inProgressPartition.foreach { tp => - val fetchStateOpt = partitions.get(tp) + val fetchStateOpt = Option(partitions.get(tp)) fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState => - return Some((tp, fetchState)) + return Optional.of((tp, fetchState)) } } inProgressPartition = None - val nextPartitionOpt = nextReadyPartition(partitions) + val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap) nextPartitionOpt.foreach { case (tp, fetchState) => - inProgressPartition = Some(tp) + inProgressPartition = Option(tp) info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " + s"Including this partition, there are ${partitions.size} remaining partitions to copy by this thread.") + return Optional.of((tp, fetchState)) } - nextPartitionOpt + Optional.empty() Review Comment: It's a mistake, I've updated whole method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org