mumrah commented on code in PR #18801:
URL: https://github.com/apache/kafka/pull/18801#discussion_r1965712115


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -821,17 +821,19 @@ object TestUtils extends Logging {
     waitUntilTrue(
       () => brokers.forall { broker =>
         if (expectedNumPartitions == 0) {
-          broker.metadataCache.numPartitions(topic) == None
+          broker.metadataCache.numPartitions(topic).isEmpty
         } else {
-          broker.metadataCache.numPartitions(topic) == 
Some(expectedNumPartitions)
+          broker.metadataCache.numPartitions(topic).isPresent && 
broker.metadataCache.numPartitions(topic).get == expectedNumPartitions

Review Comment:
   I think this can be simplified to use `filter` or maybe `orElse(null) = `



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2383,7 +2385,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       () => {
         val brokers = new 
DescribeClusterResponseData.DescribeClusterBrokerCollection()
         val describeClusterRequest = request.body[DescribeClusterRequest]
-        metadataCache.getBrokerNodes(request.context.listenerName).foreach { 
node =>
+        
metadataCache.getBrokerNodes(request.context.listenerName).asScala.foreach { 
node =>

Review Comment:
   Do we need to convert to Scala for the `foreach` here? Could we use the Java 
stream and its `forEach` instead?



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -320,18 +320,23 @@ class KRaftMetadataCache(
     result
   }
 
-  override def getAllTopics(): Set[String] = 
_currentImage.topics().topicsByName().keySet().asScala
+  override def getAllTopics(): util.Set[String] = 
_currentImage.topics().topicsByName().keySet()
 
-  override def getTopicPartitions(topicName: String): Set[TopicPartition] = {
-    Option(_currentImage.topics().getTopic(topicName)) match {
-      case None => Set.empty
-      case Some(topic) => topic.partitions().keySet().asScala.map(new 
TopicPartition(topicName, _))
+  override def getTopicPartitions(topicName: String): util.Set[TopicPartition] 
= {
+    val topic = _currentImage.topics().getTopic(topicName)
+    if (topic == null) {
+      return util.Set.of;
     }
+    topic.partitions.keySet.stream
+      .map(partitionId => new TopicPartition(topicName, partitionId))
+      .collect(Collectors.toSet());
   }
 
-  override def getTopicId(topicName: String): Uuid = 
_currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID)
+  override def getTopicId(topicName: String): Uuid = 
util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName))
+    .map(topic => topic.id)

Review Comment:
   We can use the method reference style here: `.map(_.id)`



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1193,8 +1195,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           val coordinatorEndpoint = topicMetadata.head.partitions.asScala
             .find(_.partitionIndex == partition)
             .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => metadataCache.
-                getAliveBrokerNode(metadata.leaderId, 
request.context.listenerName))
+            .flatMap(metadata => OptionConverters.toScala(metadataCache.

Review Comment:
   Can we let `topicMetadata` stay as as a Java collection and convert this 
whole stream to a Java stream? 
   



##########
core/src/main/scala/kafka/server/DelayedElectLeader.scala:
##########
@@ -74,7 +75,7 @@ class DelayedElectLeader(
   private def updateWaiting(): Unit = {
     val metadataCache = replicaManager.metadataCache
     val completedPartitions = waitingPartitions.collect {
-      case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, 
tp.partition).exists(_.leader == leader) => tp
+      case (tp, leader) if 
OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, 
tp.partition)).exists(_.leader == leader) => tp

Review Comment:
   The `exists` can be done in Java with `filter` + `isPresent`



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -345,68 +350,75 @@ class KRaftMetadataCache(
     
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 
1
   }
 
-  override def getAliveBrokers(): Iterable[BrokerMetadata] = 
getAliveBrokers(_currentImage)
+  override def getAliveBrokers(): util.List[BrokerMetadata] = 
getAliveBrokers(_currentImage)
 
-  private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] 
= {
-    image.cluster().brokers().values().asScala.filterNot(_.fenced()).
-      map(b => new BrokerMetadata(b.id, b.rack))
+  private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] 
= {
+    _currentImage.cluster.brokers.values.stream
+      .filter(broker => !broker.fenced)
+      .map(broker => new BrokerMetadata(broker.id, broker.rack))
+      .collect(Collectors.toList())
   }
 
-  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
-    Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
-      flatMap(_.node(listenerName.value()).toScala)
+  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
util.Optional[Node] = {
+    util.Optional.ofNullable(_currentImage.cluster.broker(brokerId))
+      .filter(broker => !broker.fenced)

Review Comment:
   With Java 11 we can now use `Predicate.not` for negating filters with 
lambda. This can be
   
   ```
   .filter(Predicate.not(_.fenced))
   ```
   
   



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -239,12 +239,12 @@ class KRaftMetadataCache(
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  override def getTopicMetadata(topics: Set[String],
+  override def getTopicMetadata(topics: util.Set[String],
                                 listenerName: ListenerName,
                                 errorUnavailableEndpoints: Boolean = false,
-                                errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponseTopic] = {
+                                errorUnavailableListeners: Boolean = false): 
util.List[MetadataResponseTopic] = {
     val image = _currentImage
-    topics.toSeq.flatMap { topic =>
+    topics.asScala.toSeq.flatMap { topic =>

Review Comment:
   Can we avoid this conversion to Scala and back to Java collection? Since 
`topics` is now a Java set, it seems like we can use a Java stream here.



##########
core/src/main/scala/kafka/cluster/Replica.scala:
##########
@@ -113,7 +113,7 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition, val metadat
     replicaState.updateAndGet { currentReplicaState =>
       val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
       // Fence the update if it provides a stale broker epoch.
-      if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
+      if (brokerEpoch != -1 && cachedBrokerEpoch.isPresent && 
cachedBrokerEpoch.get > brokerEpoch) {

Review Comment:
   Simplify to `filter` + `isPresent`



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -186,9 +188,9 @@ class AddPartitionsToTxnManager(
   }
 
   private def getTransactionCoordinator(partition: Int): Option[Node] = {
-   metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
+   
OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME,
 partition))
       .filter(_.leader != MetadataResponse.NO_LEADER_ID)
-      .flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, 
interBrokerListenerName))
+      .flatMap(metadata => 
OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, 
interBrokerListenerName)))

Review Comment:
   I think this would be better if we keep the Java Optional and convert it at 
the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to