chia7712 commented on code in PR #18801:
URL: https://github.com/apache/kafka/pull/18801#discussion_r1973109296


##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -345,68 +352,75 @@ class KRaftMetadataCache(
     
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 
1
   }
 
-  override def getAliveBrokers(): Iterable[BrokerMetadata] = 
getAliveBrokers(_currentImage)
+  override def getAliveBrokers(): util.List[BrokerMetadata] = 
getAliveBrokers(_currentImage)
 
-  private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] 
= {
-    image.cluster().brokers().values().asScala.filterNot(_.fenced()).
-      map(b => new BrokerMetadata(b.id, b.rack))
+  private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] 
= {
+    _currentImage.cluster().brokers().values().stream()

Review Comment:
   `_currentImage` -> `image`



##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public interface MetadataCache extends ConfigRepository {
+
+    /**
+     * Return topic metadata for a given set of topics and listener. See 
KafkaApis#handleTopicMetadataRequest for details
+     * on the use of the two boolean flags.
+     *
+     * @param topics                      The set of topics.
+     * @param listenerName                The listener name.
+     * @param errorUnavailableEndpoints   If true, we return an error on 
unavailable brokers. This is used to support
+     *                                    MetadataResponse version 0.
+     * @param errorUnavailableListeners   If true, return LEADER_NOT_AVAILABLE 
if the listener is not found on the leader.
+     *                                    This is used for MetadataResponse 
versions 0-5.
+     * @return                            A collection of topic metadata.
+     */
+    List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners);
+
+    Set<String> getAllTopics();
+
+    Set<TopicPartition> getTopicPartitions(String topicName);
+
+    boolean hasAliveBroker(int brokerId);
+
+    List<BrokerMetadata> getAliveBrokers();
+
+    Optional<Long> getAliveBrokerEpoch(int brokerId);
+
+    boolean isBrokerFenced(int brokerId);
+
+    boolean isBrokerShuttingDown(int brokerId);
+
+    Uuid getTopicId(String topicName);
+
+    Optional<String> getTopicName(Uuid topicId);
+
+    Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName);
+
+    List<Node> getAliveBrokerNodes(ListenerName listenerName);
+
+    List<Node> getBrokerNodes(ListenerName listenerName);
+
+    Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId);
+
+    /**
+     * Return the number of partitions in the given topic, or None if the 
given topic does not exist.
+     */
+    Optional<Integer> numPartitions(String topic);
+
+    Map<String, Uuid> topicNamesToIds();

Review Comment:
   it is useless - maybe we can remove it to simplify the interface. 



##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public interface MetadataCache extends ConfigRepository {
+
+    /**
+     * Return topic metadata for a given set of topics and listener. See 
KafkaApis#handleTopicMetadataRequest for details
+     * on the use of the two boolean flags.
+     *
+     * @param topics                      The set of topics.
+     * @param listenerName                The listener name.
+     * @param errorUnavailableEndpoints   If true, we return an error on 
unavailable brokers. This is used to support
+     *                                    MetadataResponse version 0.
+     * @param errorUnavailableListeners   If true, return LEADER_NOT_AVAILABLE 
if the listener is not found on the leader.
+     *                                    This is used for MetadataResponse 
versions 0-5.
+     * @return                            A collection of topic metadata.
+     */
+    List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners);
+
+    Set<String> getAllTopics();
+
+    Set<TopicPartition> getTopicPartitions(String topicName);
+
+    boolean hasAliveBroker(int brokerId);
+
+    List<BrokerMetadata> getAliveBrokers();
+
+    Optional<Long> getAliveBrokerEpoch(int brokerId);
+
+    boolean isBrokerFenced(int brokerId);
+
+    boolean isBrokerShuttingDown(int brokerId);
+
+    Uuid getTopicId(String topicName);
+
+    Optional<String> getTopicName(Uuid topicId);
+
+    Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName);
+
+    List<Node> getAliveBrokerNodes(ListenerName listenerName);
+
+    List<Node> getBrokerNodes(ListenerName listenerName);
+
+    Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId);
+
+    /**
+     * Return the number of partitions in the given topic, or None if the 
given topic does not exist.
+     */
+    Optional<Integer> numPartitions(String topic);
+
+    Map<String, Uuid> topicNamesToIds();
+
+    Map<Uuid, String> topicIdsToNames();
+
+    Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo();

Review Comment:
   ditto



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -239,35 +239,37 @@ class KRaftMetadataCache(
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  override def getTopicMetadata(topics: Set[String],
+  override def getTopicMetadata(topics: util.Set[String],
                                 listenerName: ListenerName,
                                 errorUnavailableEndpoints: Boolean = false,
-                                errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponseTopic] = {
+                                errorUnavailableListeners: Boolean = false): 
util.List[MetadataResponseTopic] = {
     val image = _currentImage
-    topics.toSeq.flatMap { topic =>
-      getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
-        new MetadataResponseTopic()
-          .setErrorCode(Errors.NONE.code)
-          .setName(topic)
-          
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
-          .setIsInternal(Topic.isInternal(topic))
-          .setPartitions(partitionMetadata.toBuffer.asJava)
+    topics.stream().flatMap(topic =>
+      getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners) match {
+        case Some(partitionMetadata) =>
+          util.stream.Stream.of(new MetadataResponseTopic()
+            .setErrorCode(Errors.NONE.code)
+            .setName(topic)
+            
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
+            .setIsInternal(Topic.isInternal(topic))
+            .setPartitions(partitionMetadata.toBuffer.asJava))
+        case None =>  util.stream.Stream.empty()

Review Comment:
   please remove the redundant space



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -290,7 +290,7 @@ default void waitForTopic(String topic, int partitions) 
throws InterruptedExcept
         TestUtils.waitForCondition(
             () -> brokers.stream().allMatch(broker -> partitions == 0 ?
                 broker.metadataCache().numPartitions(topic).isEmpty() :
-                
broker.metadataCache().numPartitions(topic).contains(partitions)
+                broker.metadataCache().numPartitions(topic).get() == partitions

Review Comment:
   `broker.metadataCache().numPartitions(topic).filter(p -> p == 
partitions).isPresent()`



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -239,35 +239,37 @@ class KRaftMetadataCache(
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  override def getTopicMetadata(topics: Set[String],
+  override def getTopicMetadata(topics: util.Set[String],
                                 listenerName: ListenerName,
                                 errorUnavailableEndpoints: Boolean = false,
-                                errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponseTopic] = {
+                                errorUnavailableListeners: Boolean = false): 
util.List[MetadataResponseTopic] = {
     val image = _currentImage
-    topics.toSeq.flatMap { topic =>
-      getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
-        new MetadataResponseTopic()
-          .setErrorCode(Errors.NONE.code)
-          .setName(topic)
-          
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
-          .setIsInternal(Topic.isInternal(topic))
-          .setPartitions(partitionMetadata.toBuffer.asJava)
+    topics.stream().flatMap(topic =>
+      getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners) match {
+        case Some(partitionMetadata) =>
+          util.stream.Stream.of(new MetadataResponseTopic()
+            .setErrorCode(Errors.NONE.code)
+            .setName(topic)
+            
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
+            .setIsInternal(Topic.isInternal(topic))
+            .setPartitions(partitionMetadata.toBuffer.asJava))
+        case None =>  util.stream.Stream.empty()
       }
-    }
+    ).collect(Collectors.toList())
   }
 
   override def describeTopicResponse(
-    topics: Iterator[String],
+    topics: util.Iterator[String],
     listenerName: ListenerName,
-    topicPartitionStartIndex: String => Int,
+    topicPartitionStartIndex: util.function.Function[String, Integer],
     maximumNumberOfPartitions: Int,
     ignoreTopicsWithExceptions: Boolean
   ): DescribeTopicPartitionsResponseData = {
     val image = _currentImage
     var remaining = maximumNumberOfPartitions
     val result = new DescribeTopicPartitionsResponseData()
     breakable {
-      topics.foreach { topicName =>
+      topics.asScala.foreach { topicName =>

Review Comment:
   `forEachRemaining`



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -345,68 +352,75 @@ class KRaftMetadataCache(
     
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 
1
   }
 
-  override def getAliveBrokers(): Iterable[BrokerMetadata] = 
getAliveBrokers(_currentImage)
+  override def getAliveBrokers(): util.List[BrokerMetadata] = 
getAliveBrokers(_currentImage)
 
-  private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] 
= {
-    image.cluster().brokers().values().asScala.filterNot(_.fenced()).
-      map(b => new BrokerMetadata(b.id, b.rack))
+  private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] 
= {
+    _currentImage.cluster().brokers().values().stream()
+      .filter(Predicate.not(_.fenced))
+      .map(broker => new BrokerMetadata(broker.id, broker.rack))
+      .collect(Collectors.toList())
   }
 
-  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
-    Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
-      flatMap(_.node(listenerName.value()).toScala)
+  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
util.Optional[Node] = {
+    util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
+      .filter(Predicate.not(_.fenced))
+      .flatMap(broker => broker.node(listenerName.value))
   }
 
-  override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = {
-    _currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
-      flatMap(_.node(listenerName.value()).toScala).toSeq
+  override def getAliveBrokerNodes(listenerName: ListenerName): 
util.List[Node] = {
+    _currentImage.cluster.brokers.values.stream
+      .filter(Predicate.not(_.fenced))
+      .flatMap(broker => broker.node(listenerName.value).stream)
+      .collect(Collectors.toList())
   }
 
-  override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = {
-    
_currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).toScala).toSeq
+  override def getBrokerNodes(listenerName: ListenerName): util.List[Node] = {
+    _currentImage.cluster.brokers.values.stream
+      .flatMap(broker => broker.node(listenerName.value).stream)
+      .collect(Collectors.toList())
   }
 
-  override def getLeaderAndIsr(topicName: String, partitionId: Int): 
Option[LeaderAndIsr] = {
-    Option(_currentImage.topics().getTopic(topicName)).
-      flatMap(topic => Option(topic.partitions().get(partitionId))).
-      flatMap(partition => Some(new LeaderAndIsr(partition.leader, 
partition.leaderEpoch,
+  override def getLeaderAndIsr(topicName: String, partitionId: Int): 
util.Optional[LeaderAndIsr] = {
+    util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
+      flatMap(topic => 
util.Optional.ofNullable(topic.partitions().get(partitionId))).
+      flatMap(partition => util.Optional.ofNullable(new 
LeaderAndIsr(partition.leader, partition.leaderEpoch,
         util.Arrays.asList(partition.isr.map(i => i: java.lang.Integer): _*), 
partition.leaderRecoveryState, partition.partitionEpoch)))
   }
 
-  override def numPartitions(topicName: String): Option[Int] = {
-    Option(_currentImage.topics().getTopic(topicName)).
+  override def numPartitions(topicName: String): util.Optional[Integer] = {
+    util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
       map(topic => topic.partitions().size())
   }
 
   override def topicNamesToIds(): util.Map[String, Uuid] = 
_currentImage.topics.topicNameToIdView()
 
   override def topicIdsToNames(): util.Map[Uuid, String] = 
_currentImage.topics.topicIdToNameView()
 
-  override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) 
= {
+  override def topicIdInfo(): util.Map.Entry[util.Map[String, Uuid], 
util.Map[Uuid, String]] = {
     val image = _currentImage
-    (image.topics.topicNameToIdView(), image.topics.topicIdToNameView())
+    new util.AbstractMap.SimpleEntry(image.topics.topicNameToIdView(), 
image.topics.topicIdToNameView())
   }
 
   // if the leader is not known, return None;
   // if the leader is known and corresponding node is available, return 
Some(node)
   // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
-  override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int, 
listenerName: ListenerName): Option[Node] = {
+  override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int, 
listenerName: ListenerName): util.Optional[Node] = {
     val image = _currentImage
     Option(image.topics().getTopic(topicName)) match {
-      case None => None
+      case None => util.Optional.empty()
       case Some(topic) => Option(topic.partitions().get(partitionId)) match {
-        case None => None
+        case None => util.Optional.empty()
         case Some(partition) => 
Option(image.cluster().broker(partition.leader)) match {
-          case None => Some(Node.noNode)
-          case Some(broker) => 
Some(broker.node(listenerName.value()).orElse(Node.noNode()))
+          case None => util.Optional.of(Node.noNode)
+          case Some(broker) => 
util.Optional.of(broker.node(listenerName.value()).orElse(Node.noNode()))
         }
       }
     }
   }
 
-  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node] = {
+  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): util.Map[Integer, Node] = {
     val image = _currentImage
-    val result = new mutable.HashMap[Int, Node]()
+    val result = new mutable.HashMap[Integer, Node]()

Review Comment:
   could you please use java Map instead?



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -186,9 +188,9 @@ class AddPartitionsToTxnManager(
   }
 
   private def getTransactionCoordinator(partition: Int): Option[Node] = {

Review Comment:
   we can use Java optional as it has only one usage



##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public interface MetadataCache extends ConfigRepository {
+
+    /**
+     * Return topic metadata for a given set of topics and listener. See 
KafkaApis#handleTopicMetadataRequest for details
+     * on the use of the two boolean flags.
+     *
+     * @param topics                      The set of topics.
+     * @param listenerName                The listener name.
+     * @param errorUnavailableEndpoints   If true, we return an error on 
unavailable brokers. This is used to support
+     *                                    MetadataResponse version 0.
+     * @param errorUnavailableListeners   If true, return LEADER_NOT_AVAILABLE 
if the listener is not found on the leader.
+     *                                    This is used for MetadataResponse 
versions 0-5.
+     * @return                            A collection of topic metadata.
+     */
+    List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners);
+
+    Set<String> getAllTopics();
+
+    Set<TopicPartition> getTopicPartitions(String topicName);
+
+    boolean hasAliveBroker(int brokerId);
+
+    List<BrokerMetadata> getAliveBrokers();
+
+    Optional<Long> getAliveBrokerEpoch(int brokerId);
+
+    boolean isBrokerFenced(int brokerId);
+
+    boolean isBrokerShuttingDown(int brokerId);
+
+    Uuid getTopicId(String topicName);
+
+    Optional<String> getTopicName(Uuid topicId);
+
+    Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName);
+
+    List<Node> getAliveBrokerNodes(ListenerName listenerName);
+
+    List<Node> getBrokerNodes(ListenerName listenerName);
+
+    Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId);
+
+    /**
+     * Return the number of partitions in the given topic, or None if the 
given topic does not exist.
+     */
+    Optional<Integer> numPartitions(String topic);
+
+    Map<String, Uuid> topicNamesToIds();
+
+    Map<Uuid, String> topicIdsToNames();
+
+    Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo();
+
+    /**
+     * Get a partition leader's endpoint
+     *
+     * @return  If the leader is known, and the listener name is available, 
return Some(node). If the leader is known,
+     *          but the listener is unavailable, return Some(Node.NO_NODE). 
Otherwise, if the leader is not known,
+     *          return None
+     */
+    Optional<Node> getPartitionLeaderEndpoint(String topic, int partitionId, 
ListenerName listenerName);
+
+    Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, 
ListenerName listenerName);
+
+    Cluster getClusterMetadata(String clusterId, ListenerName listenerName);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to