mumrah commented on code in PR #18801: URL: https://github.com/apache/kafka/pull/18801#discussion_r1965712115
########## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ########## @@ -821,17 +821,19 @@ object TestUtils extends Logging { waitUntilTrue( () => brokers.forall { broker => if (expectedNumPartitions == 0) { - broker.metadataCache.numPartitions(topic) == None + broker.metadataCache.numPartitions(topic).isEmpty } else { - broker.metadataCache.numPartitions(topic) == Some(expectedNumPartitions) + broker.metadataCache.numPartitions(topic).isPresent && broker.metadataCache.numPartitions(topic).get == expectedNumPartitions Review Comment: I think this can be simplified to use `filter` or maybe `orElse(null) = ` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2383,7 +2385,7 @@ class KafkaApis(val requestChannel: RequestChannel, () => { val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection() val describeClusterRequest = request.body[DescribeClusterRequest] - metadataCache.getBrokerNodes(request.context.listenerName).foreach { node => + metadataCache.getBrokerNodes(request.context.listenerName).asScala.foreach { node => Review Comment: Do we need to convert to Scala for the `foreach` here? Could we use the Java stream and its `forEach` instead? ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -320,18 +320,23 @@ class KRaftMetadataCache( result } - override def getAllTopics(): Set[String] = _currentImage.topics().topicsByName().keySet().asScala + override def getAllTopics(): util.Set[String] = _currentImage.topics().topicsByName().keySet() - override def getTopicPartitions(topicName: String): Set[TopicPartition] = { - Option(_currentImage.topics().getTopic(topicName)) match { - case None => Set.empty - case Some(topic) => topic.partitions().keySet().asScala.map(new TopicPartition(topicName, _)) + override def getTopicPartitions(topicName: String): util.Set[TopicPartition] = { + val topic = _currentImage.topics().getTopic(topicName) + if (topic == null) { + return util.Set.of; } + topic.partitions.keySet.stream + .map(partitionId => new TopicPartition(topicName, partitionId)) + .collect(Collectors.toSet()); } - override def getTopicId(topicName: String): Uuid = _currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID) + override def getTopicId(topicName: String): Uuid = util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName)) + .map(topic => topic.id) Review Comment: We can use the method reference style here: `.map(_.id)` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -1193,8 +1195,8 @@ class KafkaApis(val requestChannel: RequestChannel, val coordinatorEndpoint = topicMetadata.head.partitions.asScala .find(_.partitionIndex == partition) .filter(_.leaderId != MetadataResponse.NO_LEADER_ID) - .flatMap(metadata => metadataCache. - getAliveBrokerNode(metadata.leaderId, request.context.listenerName)) + .flatMap(metadata => OptionConverters.toScala(metadataCache. Review Comment: Can we let `topicMetadata` stay as as a Java collection and convert this whole stream to a Java stream? ########## core/src/main/scala/kafka/server/DelayedElectLeader.scala: ########## @@ -74,7 +75,7 @@ class DelayedElectLeader( private def updateWaiting(): Unit = { val metadataCache = replicaManager.metadataCache val completedPartitions = waitingPartitions.collect { - case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp + case (tp, leader) if OptionConverters.toScala(metadataCache.getLeaderAndIsr(tp.topic, tp.partition)).exists(_.leader == leader) => tp Review Comment: The `exists` can be done in Java with `filter` + `isPresent` ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -345,68 +350,75 @@ class KRaftMetadataCache( Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1 } - override def getAliveBrokers(): Iterable[BrokerMetadata] = getAliveBrokers(_currentImage) + override def getAliveBrokers(): util.List[BrokerMetadata] = getAliveBrokers(_currentImage) - private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = { - image.cluster().brokers().values().asScala.filterNot(_.fenced()). - map(b => new BrokerMetadata(b.id, b.rack)) + private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] = { + _currentImage.cluster.brokers.values.stream + .filter(broker => !broker.fenced) + .map(broker => new BrokerMetadata(broker.id, broker.rack)) + .collect(Collectors.toList()) } - override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { - Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()). - flatMap(_.node(listenerName.value()).toScala) + override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): util.Optional[Node] = { + util.Optional.ofNullable(_currentImage.cluster.broker(brokerId)) + .filter(broker => !broker.fenced) Review Comment: With Java 11 we can now use `Predicate.not` for negating filters with lambda. This can be ``` .filter(Predicate.not(_.fenced)) ``` ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -239,12 +239,12 @@ class KRaftMetadataCache( } // errorUnavailableEndpoints exists to support v0 MetadataResponses - override def getTopicMetadata(topics: Set[String], + override def getTopicMetadata(topics: util.Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, - errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { + errorUnavailableListeners: Boolean = false): util.List[MetadataResponseTopic] = { val image = _currentImage - topics.toSeq.flatMap { topic => + topics.asScala.toSeq.flatMap { topic => Review Comment: Can we avoid this conversion to Scala and back to Java collection? Since `topics` is now a Java set, it seems like we can use a Java stream here. ########## core/src/main/scala/kafka/cluster/Replica.scala: ########## @@ -113,7 +113,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat replicaState.updateAndGet { currentReplicaState => val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId) // Fence the update if it provides a stale broker epoch. - if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { + if (brokerEpoch != -1 && cachedBrokerEpoch.isPresent && cachedBrokerEpoch.get > brokerEpoch) { Review Comment: Simplify to `filter` + `isPresent` ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -186,9 +188,9 @@ class AddPartitionsToTxnManager( } private def getTransactionCoordinator(partition: Int): Option[Node] = { - metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition) + OptionConverters.toScala(metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)) .filter(_.leader != MetadataResponse.NO_LEADER_ID) - .flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName)) + .flatMap(metadata => OptionConverters.toScala(metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))) Review Comment: I think this would be better if we keep the Java Optional and convert it at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org