YutaLin commented on code in PR #19630:
URL: https://github.com/apache/kafka/pull/19630#discussion_r2080868631


##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
             .setPartition(tp.partition)
             .setErrorCode(Errors.forException(t).code)
       }
-    }
+    }.asJava
   }
 
-  override def buildFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+  override def buildFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
     // Only include replica in the fetch request if it is not throttled.
     if (quota.isQuotaExceeded) {
-      ResultWithPartitions(None, Set.empty)
+      new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
     } else {
-      selectPartitionToFetch(partitions) match {
-        case Some((tp, fetchState)) =>
-          buildFetchForPartition(tp, fetchState)
-        case None =>
-          ResultWithPartitions(None, Set.empty)
+      val selectPartition = selectPartitionToFetch(partitions)
+      if (selectPartition.isPresent) {
+        val (tp, fetchState) = selectPartition.get()
+        buildFetchForPartition(tp, fetchState)
+      } else {
+        new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
       }
     }
   }
 
-  private def selectPartitionToFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+  private def selectPartitionToFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
     // Only move one partition at a time to increase its catch-up rate and 
thus reduce the time spent on
     // moving any given replica. Replicas are selected in ascending order 
(lexicographically by topic) from the
     // partitions that are ready to fetch. Once selected, we will continue 
fetching the same partition until it
     // becomes unavailable or is removed.
 
     inProgressPartition.foreach { tp =>
-      val fetchStateOpt = partitions.get(tp)
+      val fetchStateOpt = Option(partitions.get(tp))
       fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
-        return Some((tp, fetchState))
+        return Optional.of((tp, fetchState))
       }
     }
 
     inProgressPartition = None
 
-    val nextPartitionOpt = nextReadyPartition(partitions)
+    val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
     nextPartitionOpt.foreach { case (tp, fetchState) =>
-      inProgressPartition = Some(tp)
+      inProgressPartition = Option(tp)
       info(s"Beginning/resuming copy of partition $tp from offset 
${fetchState.fetchOffset}. " +
         s"Including this partition, there are ${partitions.size} remaining 
partitions to copy by this thread.")
+      return Optional.of((tp, fetchState))
     }
-    nextPartitionOpt
+    Optional.empty()

Review Comment:
   It's a mistake, I've updated whole method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to