jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572472724
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL - fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + + // Only make changes to topic IDs if we have a new request version. + // If we receive an old request version, ignore all topic ID code, keep IDs that are there. + if (version >= 13) { Review comment: I think we do since this is looking at the partitions cached in the session. I'll take another look though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org