rajinisivaram commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r566449639



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +76,25 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+    /**
+     * All of the topic names mapped to topic ids for topics which exist in 
the fetch request session.
+     */
+    private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+    /**
+     * The number of partitions for all topics which exist in the fetch 
request session.
+     */
+    private Map<String, Integer> sessionPartitionsPerTopic = new HashMap<>(0);
+
+    public Map<Uuid, String> getSessionTopicNames() {

Review comment:
       We don't use `get` prefix for getters

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -216,6 +217,14 @@ public synchronized boolean updateRequested() {
         }
     }
 
+    public synchronized Map<String, Uuid> topicIds() {
+        return cache.topicIds();
+    }
+
+    public synchronized Map<Uuid, String> topicNames() {
+        return cache.topicNames();
+    }

Review comment:
       There are several places where we use this combination of two maps, 
should we create a class that maintains a bidirectional map?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -254,8 +255,14 @@ public synchronized int sendFetches() {
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
             final Node fetchTarget = entry.getKey();
             final FetchSessionHandler.FetchRequestData data = entry.getValue();
+            final short maxVersion;
+            if (ApiKeys.FETCH.latestVersion() >= 13 && !data.canUseTopicIds()) 
{

Review comment:
       The fact that you are running this code implies 
`ApiKeys.FETCH.latestVersion() >= 13`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -347,9 +432,18 @@ public int sessionId() {
                 partitionResponses.add(partitionData.partitionResponse);
                 topicResponseList.add(new 
FetchResponseData.FetchableTopicResponse()
                         .setTopic(entry.getKey().topic())
+                        
.setTopicId(topicIds.getOrDefault(entry.getKey().topic(), Uuid.ZERO_UUID))
                         .setPartitionResponses(partitionResponses));
             }
         });
+        // ID errors will be empty unless topic IDs are supported and there 
were topic ID errors

Review comment:
       It will be good to see if can separate out new and old forms of 
FetchRequest/Response. It is not a big deal since it is just wrapping the 
protocol layer.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +149,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here and 
replace if a new topic ID is added
+        // or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID
+                newTopicIds.remove(partition.topic());

Review comment:
       Can we end up with cases with some topics with ids and some without?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -187,24 +205,86 @@ class CachedPartition(val topic: String,
   }
 }
 
+/**
+ * Very similar to CachedPartition above, CachedUnresolvedPartition is used 
for incremental fetch requests.
+ * These objects store partitions that had topic IDs that could not be 
resolved by the broker.
+ *
+ * Upon each incremental request in the session, these partitions will be 
loaded. They can either be removed
+ * through resolving the partition with the broker's topicNames map or by 
receiving an unresolved toForget ID.
+ *
+ * Since these partitions signify an error, they will always be returned in 
the response.
+ */
+
+class CachedUnresolvedPartition(val topicId: Uuid,
+                      val partition: Int,

Review comment:
       nit: indentation

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -89,19 +111,40 @@ public FetchSessionHandler(LogContext logContext, int 
node) {
          */
         private final Map<TopicPartition, PartitionData> sessionPartitions;
 
+        /**
+         * All of the topic IDs for topics which exist in the fetch request.
+         */
+        private final Map<String, Uuid> topicIds;
+
+        /**
+         *  All of the topic names for the topic IDs which exist in the fetch 
request
+         */
+        private final Map<Uuid, String> topicNames;
+
         /**
          * The metadata to use in this fetch request.
          */
         private final FetchMetadata metadata;
 
+        /**
+         * The topics in this fetch request

Review comment:
       Comment needs updating?

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -113,7 +113,9 @@ object ApiVersion {
     // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
     KAFKA_2_8_IV0,
     // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
-    KAFKA_2_8_IV1
+    KAFKA_2_8_IV1,
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    KAFKA_2_8_IV2

Review comment:
       We need to remember to set this based on which version this is being 
merge to.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -259,8 +262,49 @@ public T records() {
         }
     }
 
+    public static final class IdError {
+        private final Uuid id;
+        private final Set<Integer> partitions;
+        private final Errors error;
+
+        public IdError(Uuid id,
+                List<Integer> partitions,
+                Errors error) {
+            this.id = id;
+            this.partitions = new HashSet<>(partitions);
+            this.error = error;
+        }
+
+        public Uuid id() {
+            return this.id;
+        }
+
+        public Set<Integer> partitions() {
+            return this.partitions;
+        }
+
+        public void addPartitions(List<Integer> partitions) {
+            partitions.forEach(partition -> {
+                partitions.add(partition);
+            });

Review comment:
       `this.partitions.addAll(partitions)`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -110,7 +116,68 @@ public String toString() {
         }
     }
 
-    private Optional<Integer> optionalEpoch(int rawEpochValue) {
+    public static final class UnresolvedPartitions {

Review comment:
       The whole FetchRequest class is quite hard to follow without reading the 
KIP and looking at multiple places. It will be good to add some comments at the 
class level.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -259,8 +262,49 @@ public T records() {
         }
     }
 
+    public static final class IdError {

Review comment:
       Since we have session ids and topic ids in the context of a fetch 
request, we should probably qualify `TopicId`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -358,6 +452,15 @@ public int sessionId() {
                 .setResponses(topicResponseList);
     }
 
+    private Boolean supportsTopicIds() {
+        return data.responses().stream().findFirst().filter(
+            topic -> !topic.topicId().equals(Uuid.ZERO_UUID)).isPresent();
+    }
+
+    public Set<Uuid> topicIds() {
+        return data.responses().stream().map(resp -> resp.topicId()).filter(id 
-> !id.equals(Uuid.ZERO_UUID)).collect(Collectors.toSet());

Review comment:
       This suggests we can have a combination of zero and non-zero?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -312,26 +417,45 @@ private String 
partitionsToLogString(Collection<TopicPartition> partitions) {
         return ret;
     }
 
+    static Set<Uuid> findMissingId(Set<Uuid> toFind, Set<Uuid> toSearch) {

Review comment:
       Could just parameterize `findMissing`?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -187,24 +205,86 @@ class CachedPartition(val topic: String,
   }
 }
 
+/**
+ * Very similar to CachedPartition above, CachedUnresolvedPartition is used 
for incremental fetch requests.
+ * These objects store partitions that had topic IDs that could not be 
resolved by the broker.
+ *
+ * Upon each incremental request in the session, these partitions will be 
loaded. They can either be removed
+ * through resolving the partition with the broker's topicNames map or by 
receiving an unresolved toForget ID.
+ *
+ * Since these partitions signify an error, they will always be returned in 
the response.
+ */
+
+class CachedUnresolvedPartition(val topicId: Uuid,
+                      val partition: Int,
+                      var maxBytes: Int,
+                      var fetchOffset: Long,
+                      var leaderEpoch: Optional[Integer],
+                      var fetcherLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer]) {

Review comment:
       Does an unresolved partition have all these fields populated? Or do we 
have it here because the topic may be resolved later?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep 
IDs that are there.
+    if (version >= 13) {
+      val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else 
Errors.UNKNOWN_TOPIC_ID
+      val unresolvedIterator = unresolvedPartitions.iterator()
+      while (unresolvedIterator.hasNext()) {
+        val partition = unresolvedIterator.next()
+
+        // Remove from unresolvedPartitions if ID is unresolved in toForgetIds
+        val forgetPartitions = 
toForgetAndIds.toForgetIds.get(partition.topicId)
+        if (forgetPartitions != null && 
forgetPartitions.contains(partition.partition))
+          unresolvedIterator.remove()
+
+        // Try to resolve ID, if there is a name for the given ID, place a 
CachedPartition in partitionMap
+        // and remove from unresolvedPartitions.
+        else if (topicNames.get(partition.topicId) != null) {
+          val newTp = new TopicPartition(topicNames.get(partition.topicId), 
partition.partition)
+          val newCp = new CachedPartition(newTp, partition.topicId, 
partition.reqData)
+          partitionMap.add(newCp)
+          added.add(newTp)
+          unresolvedIterator.remove()
+        } else {
+          val idError = fetchDataAndError.idErrors.get(partition.topicId)
+          if (idError == null) {
+            fetchDataAndError.idErrors.put(partition.topicId, new 
FetchResponse.IdError(partition.topicId, 
Collections.singletonList(partition.partition), error))
+          } else {
+            
idError.addPartitions(Collections.singletonList(partition.partition))
+          }
+        }
+      }
+
+      // We will also want to check topic ID here to see if the request 
matches what we have "on file".
+      // 1. If the current ID in a cached partition is Uuid.ZERO_UUID, and we 
have a valid
+      //    ID in topic IDs, simply add the ID. If there is not a valid ID, 
keep as Uuid.ZERO_UUID.
+      // 2. If we have an situation where there is a valid ID on the 
partition, but it does not match
+      //    the ID in topic IDs (likely due to topic deletion and recreation) 
or there is no valid topic
+      //    ID on the broker (topic deleted or broker received a 
metadataResponse without IDs),
+      //    remove the cached partition from partitionMap.
+      /**val partitionIterator = partitionMap.iterator()

Review comment:
       Is this part intentionally commented out?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -358,6 +452,15 @@ public int sessionId() {
                 .setResponses(topicResponseList);
     }
 
+    private Boolean supportsTopicIds() {

Review comment:
       Shouldn't we be using versions and expect non-zero ids in new versions?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,23 +241,37 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
+        private Map<Uuid, String> topicNames;
+        private Map<String, Integer> partitionsPerTopic;
         private final boolean copySessionPartitions;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
+            this.topicNames = new HashMap<>();
+            this.partitionsPerTopic = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
+            this.topicNames = new HashMap<>(initialSize);
+            this.partitionsPerTopic = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
-            next.put(topicPartition, data);
+        public void add(TopicPartition topicPartition, Uuid id, PartitionData 
data) {
+            if (next.put(topicPartition, data) == null)
+                partitionsPerTopic.merge(topicPartition.topic(), 1, (prev, 
next) -> prev + next);

Review comment:
       We can use Integer::sum as the last arg, but do we even need to maintain 
`partitionsPerTopic`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -358,6 +452,15 @@ public int sessionId() {
                 .setResponses(topicResponseList);
     }
 
+    private Boolean supportsTopicIds() {
+        return data.responses().stream().findFirst().filter(
+            topic -> !topic.topicId().equals(Uuid.ZERO_UUID)).isPresent();

Review comment:
       Does one non-zero id mean we have all ids?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to