chia7712 commented on code in PR #19630:
URL: https://github.com/apache/kafka/pull/19630#discussion_r2080096579


##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -134,8 +135,9 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
   }
 
-  override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
-    partitions.map { case (tp, epochData) =>
+  override def fetchEpochEndOffsets(partitions: util.Map[TopicPartition, 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): 
util.Map[TopicPartition, EpochEndOffset] = {
+    val tmpPartitions = partitions.asScala.toMap
+    tmpPartitions.map { case (tp, epochData) =>

Review Comment:
   ```
   partitions.asScala.map { case (tp, epochData) =>
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -116,20 +120,23 @@ abstract class AbstractFetcherThread(name: String,
 
   private def maybeFetch(): Unit = {
     val fetchRequestOpt = inLock(partitionMapLock) {
-      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 
leader.buildFetch(partitionStates.partitionStateMap.asScala)
+      val result = leader.buildFetch(partitionStates.partitionStateMap)
+      val fetchRequestOpt = result.result
+      val partitionsWithError = result.partitionsWithError
 
-      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
+      handlePartitionsWithErrors(partitionsWithError.asScala, "maybeFetch")
 
-      if (fetchRequestOpt.isEmpty) {
+      if (!fetchRequestOpt.isPresent) {

Review Comment:
   `fetchRequestOpt.isEmpty`



##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
             .setPartition(tp.partition)
             .setErrorCode(Errors.forException(t).code)
       }
-    }
+    }.asJava
   }
 
-  override def buildFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+  override def buildFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
     // Only include replica in the fetch request if it is not throttled.
     if (quota.isQuotaExceeded) {
-      ResultWithPartitions(None, Set.empty)
+      new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
     } else {
-      selectPartitionToFetch(partitions) match {
-        case Some((tp, fetchState)) =>
-          buildFetchForPartition(tp, fetchState)
-        case None =>
-          ResultWithPartitions(None, Set.empty)
+      val selectPartition = selectPartitionToFetch(partitions)
+      if (selectPartition.isPresent) {
+        val (tp, fetchState) = selectPartition.get()
+        buildFetchForPartition(tp, fetchState)
+      } else {
+        new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
       }
     }
   }
 
-  private def selectPartitionToFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+  private def selectPartitionToFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
     // Only move one partition at a time to increase its catch-up rate and 
thus reduce the time spent on
     // moving any given replica. Replicas are selected in ascending order 
(lexicographically by topic) from the
     // partitions that are ready to fetch. Once selected, we will continue 
fetching the same partition until it
     // becomes unavailable or is removed.
 
     inProgressPartition.foreach { tp =>
-      val fetchStateOpt = partitions.get(tp)
+      val fetchStateOpt = Option(partitions.get(tp))
       fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
-        return Some((tp, fetchState))
+        return Optional.of((tp, fetchState))
       }
     }
 
     inProgressPartition = None
 
-    val nextPartitionOpt = nextReadyPartition(partitions)
+    val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
     nextPartitionOpt.foreach { case (tp, fetchState) =>
-      inProgressPartition = Some(tp)
+      inProgressPartition = Option(tp)
       info(s"Beginning/resuming copy of partition $tp from offset 
${fetchState.fetchOffset}. " +
         s"Including this partition, there are ${partitions.size} remaining 
partitions to copy by this thread.")
+      return Optional.of((tp, fetchState))
     }
-    nextPartitionOpt
+    Optional.empty()

Review Comment:
   ?



##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -136,12 +136,13 @@ private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffs
         // Find the end-offset for the epoch earlier to the given epoch from 
the leader
         Map<TopicPartition, 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs 
= new HashMap<>();
         partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
-        Option<OffsetForLeaderEpochResponseData.EpochEndOffset> 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(CollectionConverters.asScala(partitionsWithEpochs)).get(partition);
-        if (maybeEpochEndOffset.isEmpty()) {
+        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> 
endOffsets = leader.fetchEpochEndOffsets(partitionsWithEpochs);

Review Comment:
   ```java
   var epochEndOffset = 
leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition);
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -204,11 +214,27 @@ abstract class AbstractFetcherThread(name: String,
    * occur during truncation.
    */
   private def truncateToEpochEndOffsets(latestEpochsForPartitions: 
Map[TopicPartition, EpochData]): Unit = {
-    val endOffsets = leader.fetchEpochEndOffsets(latestEpochsForPartitions)
-    //Ensure we hold a lock during truncation.
+
+    val partitionsMap = new java.util.HashMap[TopicPartition, 
OffsetForLeaderPartition]()
+
+    // Fill it with converted values
+    latestEpochsForPartitions.foreach { case (tp, epochData) =>

Review Comment:
   why not using `latestEpochsForPartitions.asJava`?



##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
             .setPartition(tp.partition)
             .setErrorCode(Errors.forException(t).code)
       }
-    }
+    }.asJava
   }
 
-  override def buildFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+  override def buildFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
     // Only include replica in the fetch request if it is not throttled.
     if (quota.isQuotaExceeded) {
-      ResultWithPartitions(None, Set.empty)
+      new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
     } else {
-      selectPartitionToFetch(partitions) match {
-        case Some((tp, fetchState)) =>
-          buildFetchForPartition(tp, fetchState)
-        case None =>
-          ResultWithPartitions(None, Set.empty)
+      val selectPartition = selectPartitionToFetch(partitions)
+      if (selectPartition.isPresent) {
+        val (tp, fetchState) = selectPartition.get()
+        buildFetchForPartition(tp, fetchState)
+      } else {
+        new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
       }
     }
   }
 
-  private def selectPartitionToFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+  private def selectPartitionToFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
     // Only move one partition at a time to increase its catch-up rate and 
thus reduce the time spent on
     // moving any given replica. Replicas are selected in ascending order 
(lexicographically by topic) from the
     // partitions that are ready to fetch. Once selected, we will continue 
fetching the same partition until it
     // becomes unavailable or is removed.
 
     inProgressPartition.foreach { tp =>
-      val fetchStateOpt = partitions.get(tp)
+      val fetchStateOpt = Option(partitions.get(tp))
       fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
-        return Some((tp, fetchState))
+        return Optional.of((tp, fetchState))
       }
     }
 
     inProgressPartition = None
 
-    val nextPartitionOpt = nextReadyPartition(partitions)
+    val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
     nextPartitionOpt.foreach { case (tp, fetchState) =>
-      inProgressPartition = Some(tp)
+      inProgressPartition = Option(tp)

Review Comment:
   why?



##########
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##########
@@ -126,14 +128,15 @@ class RemoteLeaderEndPoint(logPrefix: String,
     }
   }
 
-  override def fetchEpochEndOffsets(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+  override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): 
java.util.Map[TopicPartition, EpochEndOffset] = {
+    val tmpPartitions = partitions.asScala.toMap
     if (partitions.isEmpty) {
       debug("Skipping leaderEpoch request since all partitions do not have an 
epoch")
-      return Map.empty
+      return java.util.Collections.emptyMap()

Review Comment:
   `java.util.Map.of()`



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -116,20 +120,23 @@ abstract class AbstractFetcherThread(name: String,
 
   private def maybeFetch(): Unit = {
     val fetchRequestOpt = inLock(partitionMapLock) {
-      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 
leader.buildFetch(partitionStates.partitionStateMap.asScala)
+      val result = leader.buildFetch(partitionStates.partitionStateMap)
+      val fetchRequestOpt = result.result
+      val partitionsWithError = result.partitionsWithError
 
-      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
+      handlePartitionsWithErrors(partitionsWithError.asScala, "maybeFetch")
 
-      if (fetchRequestOpt.isEmpty) {
+      if (!fetchRequestOpt.isPresent) {
         trace(s"There are no active partitions. Back off for $fetchBackOffMs 
ms before sending a fetch request")
         partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
       }
 
       fetchRequestOpt
     }
 
-    fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, 
fetchRequest) =>
-      processFetchRequest(sessionPartitions, fetchRequest)
+    if (fetchRequestOpt.isPresent) {

Review Comment:
   ```java
   fetchRequestOpt.ifPresent(replicaFetch => 
processFetchRequest(replicaFetch.partitionData, replicaFetch.fetchRequest))
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -218,16 +244,25 @@ abstract class AbstractFetcherThread(name: String,
         curPartitionState != null && leaderEpochInRequest == 
curPartitionState.currentLeaderEpoch
       }
 
-      val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
-      handlePartitionsWithErrors(partitionsWithError, 
"truncateToEpochEndOffsets")
+      val result = maybeTruncateToEpochEndOffsets(epochEndOffsets.toMap, 
latestEpochsForPartitions)

Review Comment:
   ```java
         val result = maybeTruncateToEpochEndOffsets(epochEndOffsets.toMap, 
latestEpochsForPartitions)
         handlePartitionsWithErrors(result.partitionsWithError.asScala, 
"truncateToEpochEndOffsets")
         updateFetchOffsetAndMaybeMarkTruncationComplete(result.result)
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -151,14 +158,17 @@ abstract class AbstractFetcherThread(name: String,
 
     partitionStates.partitionStateMap.forEach { (tp, state) =>
       if (state.isTruncating) {
-        latestEpoch(tp).toScala match {
-          case Some(epoch) =>
-            partitionsWithEpochs += tp -> new EpochData()
-              .setPartition(tp.partition)
-              .setCurrentLeaderEpoch(state.currentLeaderEpoch)
-              .setLeaderEpoch(epoch)
-          case _ =>
-            partitionsWithoutEpochs += tp
+        val latestEpochOpt = latestEpoch(tp)

Review Comment:
   Are those changes necessary?



##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
             .setPartition(tp.partition)
             .setErrorCode(Errors.forException(t).code)
       }
-    }
+    }.asJava
   }
 
-  override def buildFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+  override def buildFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
     // Only include replica in the fetch request if it is not throttled.
     if (quota.isQuotaExceeded) {
-      ResultWithPartitions(None, Set.empty)
+      new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
     } else {
-      selectPartitionToFetch(partitions) match {
-        case Some((tp, fetchState)) =>
-          buildFetchForPartition(tp, fetchState)
-        case None =>
-          ResultWithPartitions(None, Set.empty)
+      val selectPartition = selectPartitionToFetch(partitions)
+      if (selectPartition.isPresent) {
+        val (tp, fetchState) = selectPartition.get()
+        buildFetchForPartition(tp, fetchState)
+      } else {
+        new ResultWithPartitions(util.Optional.empty[ReplicaFetch](), 
util.Collections.emptySet[TopicPartition]())
       }
     }
   }
 
-  private def selectPartitionToFetch(partitions: Map[TopicPartition, 
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+  private def selectPartitionToFetch(partitions: util.Map[TopicPartition, 
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
     // Only move one partition at a time to increase its catch-up rate and 
thus reduce the time spent on
     // moving any given replica. Replicas are selected in ascending order 
(lexicographically by topic) from the
     // partitions that are ready to fetch. Once selected, we will continue 
fetching the same partition until it
     // becomes unavailable or is removed.
 
     inProgressPartition.foreach { tp =>
-      val fetchStateOpt = partitions.get(tp)
+      val fetchStateOpt = Option(partitions.get(tp))
       fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
-        return Some((tp, fetchState))
+        return Optional.of((tp, fetchState))
       }
     }
 
     inProgressPartition = None
 
-    val nextPartitionOpt = nextReadyPartition(partitions)
+    val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
     nextPartitionOpt.foreach { case (tp, fetchState) =>
-      inProgressPartition = Some(tp)
+      inProgressPartition = Option(tp)
       info(s"Beginning/resuming copy of partition $tp from offset 
${fetchState.fetchOffset}. " +
         s"Including this partition, there are ${partitions.size} remaining 
partitions to copy by this thread.")
+      return Optional.of((tp, fetchState))

Review Comment:
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to