FrankYang0529 commented on code in PR #18801: URL: https://github.com/apache/kafka/pull/18801#discussion_r1997644083
########## metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; + +public interface MetadataCache extends ConfigRepository { + + /** + * Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details + * on the use of the two boolean flags. + * + * @param topics The set of topics. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints If true, we return an error on unavailable brokers. This is used to support + * MetadataResponse version 0. + * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE if the listener is not found on the leader. + * This is used for MetadataResponse versions 0-5. + * @return A collection of topic metadata. + */ + List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners); + + Set<String> getAllTopics(); + + boolean hasAliveBroker(int brokerId); + + Optional<Long> getAliveBrokerEpoch(int brokerId); + + boolean isBrokerFenced(int brokerId); + + boolean isBrokerShuttingDown(int brokerId); + + Uuid getTopicId(String topicName); + + Optional<String> getTopicName(Uuid topicId); + + Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName); + + List<Node> getAliveBrokerNodes(ListenerName listenerName); + + List<Node> getBrokerNodes(ListenerName listenerName); + + Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId); + + /** + * Return the number of partitions in the given topic, or None if the given topic does not exist. + */ + Optional<Integer> numPartitions(String topic); + + Map<Uuid, String> topicIdsToNames(); + + /** + * Get a partition leader's endpoint + * + * @return If the leader is known, and the listener name is available, return Some(node). If the leader is known, + * but the listener is unavailable, return Some(Node.NO_NODE). Otherwise, if the leader is not known, + * return None + */ + Optional<Node> getPartitionLeaderEndpoint(String topic, int partitionId, ListenerName listenerName); + + Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, ListenerName listenerName); + + boolean contains(String topic); + + boolean contains(TopicPartition tp); + + MetadataVersion metadataVersion(); + + Optional<Integer> getRandomAliveBrokerId(); + + FinalizedFeatures features(); + + DescribeClientQuotasResponseData describeClientQuotas(DescribeClientQuotasRequestData request); + + DescribeUserScramCredentialsResponseData describeScramCredentials(DescribeUserScramCredentialsRequestData request); + + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topics The iterator of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param topicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + * @param ignoreTopicsWithExceptions Whether ignore the topics with exception. + */ + DescribeTopicPartitionsResponseData describeTopicResponse( + Iterator<String> topics, + ListenerName listenerName, + Function<String, Integer> topicPartitionStartIndex, + int maximumNumberOfPartitions, + boolean ignoreTopicsWithExceptions); + + static Cluster toCluster(String clusterId, MetadataImage image) { + Map<Integer, List<Node>> brokerToNodes = new HashMap<>(); + image.cluster().brokers().values().stream() + .filter(broker -> !broker.fenced()) + .forEach(broker -> brokerToNodes.put(broker.id(), broker.nodes())); + + List<PartitionInfo> partitionInfos = new ArrayList<>(); + Set<String> internalTopics = new HashSet<>(); + + image.topics().topicsByName().values().forEach(topic -> { + topic.partitions().forEach((partitionId, partition) -> { + List<Node> nodes = brokerToNodes.get(partition.leader); + if (nodes != null) { + nodes.forEach(node -> { + partitionInfos.add(new PartitionInfo( + topic.name(), + partitionId, + node, + toArray(partition.replicas, brokerToNodes), + toArray(partition.isr, brokerToNodes), + getOfflineReplicas(image, partition).stream() + .map(brokerToNodes::get) + .flatMap(Collection::stream) + .toArray(Node[]::new) + )); + }); + if (Topic.isInternal(topic.name())) { + internalTopics.add(topic.name()); + } + } + }); + }); + + Node controllerNode = Optional.ofNullable(brokerToNodes.get(getRandomAliveBroker(image).orElse(-1))) + .map(nodes -> nodes.get(0)) + .orElse(Node.noNode()); + + return new Cluster( + clusterId, + brokerToNodes.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), + partitionInfos, + Collections.emptySet(), + internalTopics, + controllerNode + ); + } + + private static Node[] toArray(int[] replicas, Map<Integer, List<Node>> brokerToNodes) { + return Arrays.stream(replicas) + .mapToObj(brokerToNodes::get) + .flatMap(Collection::stream) + .toArray(Node[]::new); + } + + private static List<Integer> getOfflineReplicas(MetadataImage image, PartitionRegistration partition) { + List<Integer> offlineReplicas = new ArrayList<>(); + for (int brokerId : partition.replicas) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker == null || isReplicaOffline(partition, broker)) { + offlineReplicas.add(brokerId); + } + } + return offlineReplicas; + } + + private static boolean isReplicaOffline(PartitionRegistration partition, BrokerRegistration broker) { + return broker.fenced() || !broker.hasOnlineDir(partition.directory(broker.id())); + } + + private static Optional<Integer> getRandomAliveBroker(MetadataImage image) { + List<BrokerMetadata> aliveBrokers = getAliveBrokers(image); + if (aliveBrokers.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size())).id); + } + } + + private static List<BrokerMetadata> getAliveBrokers(MetadataImage image) { Review Comment: Update it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org