junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651939380



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -239,12 +245,22 @@ class FetchSession(val id: Int,
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
              toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
+    val inconsistentTopicIds = new TL
     fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+      // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
+      // If the topic already existed, check that its ID is consistent.
+      val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
+      val newCachedPart = new CachedPartition(topicPart, id, reqData)
+      if (id != Uuid.ZERO_UUID) {
+        val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id)
+        if (prevSessionTopicId != null && prevSessionTopicId != id)
+          inconsistentTopicIds.add(topicPart)

Review comment:
       If we are switching from version 12 to version 13 for a session, 
prevSessionTopicId will be null. Should we also populate inconsistentTopicIds 
in this case to force a new session in the client?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,23 +268,63 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
         private final boolean copySessionPartitions;
+        private boolean missingTopicIds;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
+        public void add(TopicPartition topicPartition, Uuid topicId, 
PartitionData data) {
             next.put(topicPartition, data);
+            // topicIds do not change between adding partitions and building, 
so we can use putIfAbsent
+            if (!topicId.equals(Uuid.ZERO_UUID)) {
+                topicIds.putIfAbsent(topicPartition.topic(), topicId);

Review comment:
       Got it.  We can keep the code as it is then.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse]
-      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-        Map.empty
+      if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
+        if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID)
+          throw new UnknownTopicIdException("There was a topic ID in the 
request that was unknown to the server.")
+        else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+          throw new FetchSessionTopicIdException("There was a topic ID in the 
request that was inconsistent with the session.")
+        else
+          Map.empty
       } else {
-        fetchResponse.responseData.asScala
+        fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, 
clientResponse.requestHeader().apiVersion()).asScala
       }
     } catch {
+      case unknownId: UnknownTopicIdException =>
+        throw unknownId
+      case sessionUnknownId: FetchSessionTopicIdException =>
+        throw sessionUnknownId

Review comment:
       Got it. Could we clean the existing code up a bit? Since 
fetchSessionHandler.handleResponse() already handles the closing of the session 
on error, it seem that we could get rid of fetchSessionHandler.handleError(t). 
Also, it seems that if fetchResponse.error() != None, we want to throw the 
error as an exception. Finally, if fetchSessionHandler.handleResponse() returns 
false, we probably want to throw an exception too?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+    /**
+     * All of the topic names mapped to topic ids for topics which exist in 
the fetch request session.
+     */
+    private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+    public Map<Uuid, String> sessionTopicNames() {
+        return sessionTopicNames;
+    }
+
+    private boolean canUseTopicIds = false;

Review comment:
       > I don't think we can calculate on a request basis since we may respond 
with topics that did not have IDs associated.
   
   I added another comment in FetchSession. If the session starts with no 
topicId and a fetch request switches to using topicId, could the server just 
return an error to force a new session? Will this avoid the need to track 
canUseTopicIds as a state? Overall, it's probably a bit better to add a bit 
complexity on the server to simplify the development on the client since we 
implement the client multiple times in different languages.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) {
                                         fetchTarget.id());
                                 return;
                             }
-                            if (!handler.handleResponse(response)) {
+                            if (!handler.handleResponse(response, maxVersion)) 
{

Review comment:
       Yes, I think that works.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) {
                                         fetchTarget.id());
                                 return;
                             }
-                            if (!handler.handleResponse(response)) {
+                            if (!handler.handleResponse(response, maxVersion)) 
{
+                                if (response.error() == 
Errors.FETCH_SESSION_TOPIC_ID_ERROR || response.error() == 
Errors.UNKNOWN_TOPIC_ID) {
+                                    metadata.requestUpdate();

Review comment:
       Thanks. Sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to