rajinisivaram commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r566449639
########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -73,6 +76,25 @@ public FetchSessionHandler(LogContext logContext, int node) { private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions = new LinkedHashMap<>(0); + /** + * All of the topic ids mapped to topic names for topics which exist in the fetch request session. + */ + private Map<String, Uuid> sessionTopicIds = new HashMap<>(0); + + /** + * All of the topic names mapped to topic ids for topics which exist in the fetch request session. + */ + private Map<Uuid, String> sessionTopicNames = new HashMap<>(0); + + /** + * The number of partitions for all topics which exist in the fetch request session. + */ + private Map<String, Integer> sessionPartitionsPerTopic = new HashMap<>(0); + + public Map<Uuid, String> getSessionTopicNames() { Review comment: We don't use `get` prefix for getters ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -216,6 +217,14 @@ public synchronized boolean updateRequested() { } } + public synchronized Map<String, Uuid> topicIds() { + return cache.topicIds(); + } + + public synchronized Map<Uuid, String> topicNames() { + return cache.topicNames(); + } Review comment: There are several places where we use this combination of two maps, should we create a class that maintains a bidirectional map? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -254,8 +255,14 @@ public synchronized int sendFetches() { for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); + final short maxVersion; + if (ApiKeys.FETCH.latestVersion() >= 13 && !data.canUseTopicIds()) { Review comment: The fact that you are running this code implies `ApiKeys.FETCH.latestVersion() >= 13`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -347,9 +432,18 @@ public int sessionId() { partitionResponses.add(partitionData.partitionResponse); topicResponseList.add(new FetchResponseData.FetchableTopicResponse() .setTopic(entry.getKey().topic()) + .setTopicId(topicIds.getOrDefault(entry.getKey().topic(), Uuid.ZERO_UUID)) .setPartitionResponses(partitionResponses)); } }); + // ID errors will be empty unless topic IDs are supported and there were topic ID errors Review comment: It will be good to see if can separate out new and old forms of FetchRequest/Response. It is not a big deal since it is just wrapping the protocol layer. ########## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ########## @@ -130,13 +149,30 @@ MetadataCache mergeWith(String newClusterId, Set<String> addInvalidTopics, Set<String> addInternalTopics, Node newController, + Map<String, Uuid> topicIds, BiPredicate<String, Boolean> retainTopic) { Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic)); Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size()); + Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size()); + + // We want the most recent topic ID. We add the old one here and replace if a new topic ID is added + // or remove if the request did not support topic IDs for this topic. + for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) { + if (shouldRetainTopic.test(entry.getKey())) { + newTopicIds.put(entry.getKey(), entry.getValue()); + } + } + for (PartitionMetadata partition : addPartitions) { newMetadataByPartition.put(partition.topicPartition, partition); + Uuid id = topicIds.get(partition.topic()); + if (id != null) + newTopicIds.put(partition.topic(), id); + else + // Remove if the latest metadata does not have a topic ID + newTopicIds.remove(partition.topic()); Review comment: Can we end up with cases with some topics with ids and some without? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -187,24 +205,86 @@ class CachedPartition(val topic: String, } } +/** + * Very similar to CachedPartition above, CachedUnresolvedPartition is used for incremental fetch requests. + * These objects store partitions that had topic IDs that could not be resolved by the broker. + * + * Upon each incremental request in the session, these partitions will be loaded. They can either be removed + * through resolving the partition with the broker's topicNames map or by receiving an unresolved toForget ID. + * + * Since these partitions signify an error, they will always be returned in the response. + */ + +class CachedUnresolvedPartition(val topicId: Uuid, + val partition: Int, Review comment: nit: indentation ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -89,19 +111,40 @@ public FetchSessionHandler(LogContext logContext, int node) { */ private final Map<TopicPartition, PartitionData> sessionPartitions; + /** + * All of the topic IDs for topics which exist in the fetch request. + */ + private final Map<String, Uuid> topicIds; + + /** + * All of the topic names for the topic IDs which exist in the fetch request + */ + private final Map<Uuid, String> topicNames; + /** * The metadata to use in this fetch request. */ private final FetchMetadata metadata; + /** + * The topics in this fetch request Review comment: Comment needs updating? ########## File path: core/src/main/scala/kafka/api/ApiVersion.scala ########## @@ -113,7 +113,9 @@ object ApiVersion { // Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) KAFKA_2_8_IV0, // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) - KAFKA_2_8_IV1 + KAFKA_2_8_IV1, + // Adds topic IDs to Fetch requests/responses (KIP-516) + KAFKA_2_8_IV2 Review comment: We need to remember to set this based on which version this is being merge to. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -259,8 +262,49 @@ public T records() { } } + public static final class IdError { + private final Uuid id; + private final Set<Integer> partitions; + private final Errors error; + + public IdError(Uuid id, + List<Integer> partitions, + Errors error) { + this.id = id; + this.partitions = new HashSet<>(partitions); + this.error = error; + } + + public Uuid id() { + return this.id; + } + + public Set<Integer> partitions() { + return this.partitions; + } + + public void addPartitions(List<Integer> partitions) { + partitions.forEach(partition -> { + partitions.add(partition); + }); Review comment: `this.partitions.addAll(partitions)`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -110,7 +116,68 @@ public String toString() { } } - private Optional<Integer> optionalEpoch(int rawEpochValue) { + public static final class UnresolvedPartitions { Review comment: The whole FetchRequest class is quite hard to follow without reading the KIP and looking at multiple places. It will be good to add some comments at the class level. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -259,8 +262,49 @@ public T records() { } } + public static final class IdError { Review comment: Since we have session ids and topic ids in the context of a fetch request, we should probably qualify `TopicId` ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -358,6 +452,15 @@ public int sessionId() { .setResponses(topicResponseList); } + private Boolean supportsTopicIds() { + return data.responses().stream().findFirst().filter( + topic -> !topic.topicId().equals(Uuid.ZERO_UUID)).isPresent(); + } + + public Set<Uuid> topicIds() { + return data.responses().stream().map(resp -> resp.topicId()).filter(id -> !id.equals(Uuid.ZERO_UUID)).collect(Collectors.toSet()); Review comment: This suggests we can have a combination of zero and non-zero? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -312,26 +417,45 @@ private String partitionsToLogString(Collection<TopicPartition> partitions) { return ret; } + static Set<Uuid> findMissingId(Set<Uuid> toFind, Set<Uuid> toSearch) { Review comment: Could just parameterize `findMissing`? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -187,24 +205,86 @@ class CachedPartition(val topic: String, } } +/** + * Very similar to CachedPartition above, CachedUnresolvedPartition is used for incremental fetch requests. + * These objects store partitions that had topic IDs that could not be resolved by the broker. + * + * Upon each incremental request in the session, these partitions will be loaded. They can either be removed + * through resolving the partition with the broker's topicNames map or by receiving an unresolved toForget ID. + * + * Since these partitions signify an error, they will always be returned in the response. + */ + +class CachedUnresolvedPartition(val topicId: Uuid, + val partition: Int, + var maxBytes: Int, + var fetchOffset: Long, + var leaderEpoch: Optional[Integer], + var fetcherLogStartOffset: Long, + var lastFetchedEpoch: Optional[Integer]) { Review comment: Does an unresolved partition have all these fields populated? Or do we have it here because the topic may be resolved later? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL - fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + + // Only make changes to topic IDs if we have a new request version. + // If we receive an old request version, ignore all topic ID code, keep IDs that are there. + if (version >= 13) { + val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID + val unresolvedIterator = unresolvedPartitions.iterator() + while (unresolvedIterator.hasNext()) { + val partition = unresolvedIterator.next() + + // Remove from unresolvedPartitions if ID is unresolved in toForgetIds + val forgetPartitions = toForgetAndIds.toForgetIds.get(partition.topicId) + if (forgetPartitions != null && forgetPartitions.contains(partition.partition)) + unresolvedIterator.remove() + + // Try to resolve ID, if there is a name for the given ID, place a CachedPartition in partitionMap + // and remove from unresolvedPartitions. + else if (topicNames.get(partition.topicId) != null) { + val newTp = new TopicPartition(topicNames.get(partition.topicId), partition.partition) + val newCp = new CachedPartition(newTp, partition.topicId, partition.reqData) + partitionMap.add(newCp) + added.add(newTp) + unresolvedIterator.remove() + } else { + val idError = fetchDataAndError.idErrors.get(partition.topicId) + if (idError == null) { + fetchDataAndError.idErrors.put(partition.topicId, new FetchResponse.IdError(partition.topicId, Collections.singletonList(partition.partition), error)) + } else { + idError.addPartitions(Collections.singletonList(partition.partition)) + } + } + } + + // We will also want to check topic ID here to see if the request matches what we have "on file". + // 1. If the current ID in a cached partition is Uuid.ZERO_UUID, and we have a valid + // ID in topic IDs, simply add the ID. If there is not a valid ID, keep as Uuid.ZERO_UUID. + // 2. If we have an situation where there is a valid ID on the partition, but it does not match + // the ID in topic IDs (likely due to topic deletion and recreation) or there is no valid topic + // ID on the broker (topic deleted or broker received a metadataResponse without IDs), + // remove the cached partition from partitionMap. + /**val partitionIterator = partitionMap.iterator() Review comment: Is this part intentionally commented out? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -358,6 +452,15 @@ public int sessionId() { .setResponses(topicResponseList); } + private Boolean supportsTopicIds() { Review comment: Shouldn't we be using versions and expect non-zero ids in new versions? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -186,23 +241,37 @@ public String toString() { * incremental fetch requests (see below). */ private LinkedHashMap<TopicPartition, PartitionData> next; + private Map<String, Uuid> topicIds; + private Map<Uuid, String> topicNames; + private Map<String, Integer> partitionsPerTopic; private final boolean copySessionPartitions; Builder() { this.next = new LinkedHashMap<>(); + this.topicIds = new HashMap<>(); + this.topicNames = new HashMap<>(); + this.partitionsPerTopic = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); + this.topicIds = new HashMap<>(initialSize); + this.topicNames = new HashMap<>(initialSize); + this.partitionsPerTopic = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ - public void add(TopicPartition topicPartition, PartitionData data) { - next.put(topicPartition, data); + public void add(TopicPartition topicPartition, Uuid id, PartitionData data) { + if (next.put(topicPartition, data) == null) + partitionsPerTopic.merge(topicPartition.topic(), 1, (prev, next) -> prev + next); Review comment: We can use Integer::sum as the last arg, but do we even need to maintain `partitionsPerTopic`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -358,6 +452,15 @@ public int sessionId() { .setResponses(topicResponseList); } + private Boolean supportsTopicIds() { + return data.responses().stream().findFirst().filter( + topic -> !topic.topicId().equals(Uuid.ZERO_UUID)).isPresent(); Review comment: Does one non-zero id mean we have all ids? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org