junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651939380
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -239,12 +245,22 @@ class FetchSession(val id: Int, // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL + val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. + // If the topic already existed, check that its ID is consistent. + val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) + val newCachedPart = new CachedPartition(topicPart, id, reqData) + if (id != Uuid.ZERO_UUID) { + val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) + if (prevSessionTopicId != null && prevSessionTopicId != id) + inconsistentTopicIds.add(topicPart) Review comment: If we are switching from version 12 to version 13 for a session, prevSessionTopicId will be null. Should we also populate inconsistentTopicIds in this case to force a new session in the client? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -186,23 +268,63 @@ public String toString() { * incremental fetch requests (see below). */ private LinkedHashMap<TopicPartition, PartitionData> next; + private Map<String, Uuid> topicIds; private final boolean copySessionPartitions; + private boolean missingTopicIds; Builder() { this.next = new LinkedHashMap<>(); + this.topicIds = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); + this.topicIds = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ - public void add(TopicPartition topicPartition, PartitionData data) { + public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) { next.put(topicPartition, data); + // topicIds do not change between adding partitions and building, so we can use putIfAbsent + if (!topicId.equals(Uuid.ZERO_UUID)) { + topicIds.putIfAbsent(topicPartition.topic(), topicId); Review comment: Got it. We can keep the code as it is then. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String, try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] - if (!fetchSessionHandler.handleResponse(fetchResponse)) { - Map.empty + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { + if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID) + throw new UnknownTopicIdException("There was a topic ID in the request that was unknown to the server.") + else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) + throw new FetchSessionTopicIdException("There was a topic ID in the request that was inconsistent with the session.") + else + Map.empty } else { - fetchResponse.responseData.asScala + fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala } } catch { + case unknownId: UnknownTopicIdException => + throw unknownId + case sessionUnknownId: FetchSessionTopicIdException => + throw sessionUnknownId Review comment: Got it. Could we clean the existing code up a bit? Since fetchSessionHandler.handleResponse() already handles the closing of the session on error, it seem that we could get rid of fetchSessionHandler.handleError(t). Also, it seems that if fetchResponse.error() != None, we want to throw the error as an exception. Finally, if fetchSessionHandler.handleResponse() returns false, we probably want to throw an exception too? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) { private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions = new LinkedHashMap<>(0); + /** + * All of the topic ids mapped to topic names for topics which exist in the fetch request session. + */ + private Map<String, Uuid> sessionTopicIds = new HashMap<>(0); + + /** + * All of the topic names mapped to topic ids for topics which exist in the fetch request session. + */ + private Map<Uuid, String> sessionTopicNames = new HashMap<>(0); + + public Map<Uuid, String> sessionTopicNames() { + return sessionTopicNames; + } + + private boolean canUseTopicIds = false; Review comment: > I don't think we can calculate on a request basis since we may respond with topics that did not have IDs associated. I added another comment in FetchSession. If the session starts with no topicId and a fetch request switches to using topicId, could the server just return an error to force a new session? Will this avoid the need to track canUseTopicIds as a state? Overall, it's probably a bit better to add a bit complexity on the server to simplify the development on the client since we implement the client multiple times in different languages. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) { fetchTarget.id()); return; } - if (!handler.handleResponse(response)) { + if (!handler.handleResponse(response, maxVersion)) { Review comment: Yes, I think that works. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) { fetchTarget.id()); return; } - if (!handler.handleResponse(response)) { + if (!handler.handleResponse(response, maxVersion)) { + if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR || response.error() == Errors.UNKNOWN_TOPIC_ID) { + metadata.requestUpdate(); Review comment: Thanks. Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org