junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r655561404
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") - FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) + FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { + var error = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent + // It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + if (partitionIter.next().getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID Review comment: The INCONSISTENT_TOPIC_ID check in ReplicaManager is not very precise since the topicId could change immediately after the check. I am thinking that another way to do this is to validate the topicId in the session again when we are generating the fetch response. We could pass in the latest topicNameToId mapping from the metadata Cache to updateAndGenerateResponseData(). If the topicId is different from those in the fetch session, we could generate a top level INCONSISTENT_TOPIC_ID error. We could then get rid of the INCONSISTENT_TOPIC_ID check in ReplicaManager. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -80,19 +96,28 @@ public Errors error() { return Errors.forCode(data.errorCode()); } - public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData() { + public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames, short version) { if (responseData == null) { synchronized (this) { if (responseData == null) { responseData = new LinkedHashMap<>(); - data.responses().forEach(topicResponse -> + data.responses().forEach(topicResponse -> { + String name; + if (version < 13) { + name = topicResponse.topic(); + } else { + name = topicNames.get(topicResponse.topicId()); + } + if (name != null) { topicResponse.partitions().forEach(partition -> - responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition)) - ); + responseData.put(new TopicPartition(name, partition.partitionIndex()), partition)); + } + }); } } } return responseData; + Review comment: extra empty line. ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -213,9 +289,24 @@ public FetchRequestData build() { } sessionPartitions = next; next = null; + canUseTopicIds = partitionsWithoutTopicIds == 0; Review comment: Could we do this once at the beginning of build()? ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ########## @@ -362,7 +363,8 @@ BROKER_ID_NOT_REGISTERED(102, "The given broker ID was not registered.", BrokerIdNotRegisteredException::new), INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new), INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new), - TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new); + TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new), + FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new); Review comment: Is there a benefit to have FETCH_SESSION_TOPIC_ID_ERROR in addition to INCONSISTENT_TOPIC_ID? Could we just always use INCONSISTENT_TOPIC_ID? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String, try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] - if (!fetchSessionHandler.handleResponse(fetchResponse)) { - Map.empty + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { + if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID) + throw new UnknownTopicIdException("There was a topic ID in the request that was unknown to the server.") + else if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) + throw new FetchSessionTopicIdException("There was a topic ID in the request that was inconsistent with the session.") + else if (fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) + throw new InconsistentTopicIdException("There was a topic ID in the request that was inconsistent with the one in the logs.") + else + Map.empty } else { - fetchResponse.responseData.asScala + fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala } } catch { + case topicIdError @ (_:UnknownTopicIdException | _:FetchSessionTopicIdException | _:InconsistentTopicIdException) => Review comment: > I think there are other errors that can occur when trying to send the request which is why we have fetchSessionHandler.handleError(t). But this all can probably be cleaned up a bit/improved so I will take a look. Then could we try/catch just leaderEndpoint.sendRequest and call fetchSessionHandler.handleError(t) on exception? This will make the code easier to understand. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -319,12 +306,55 @@ public int maxBytes() { return data.maxBytes(); } - public Map<TopicPartition, PartitionData> fetchData() { - return fetchData; + // For versions < 13, builds the partitionData map using only the FetchRequestData. + // For versions 13+, builds the partitionData map using both the FetchRequestData and a mapping of topic IDs to names. + // Throws UnknownTopicIdException for versions 13+ if the topic ID was unknown to the server. + public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) throws UnknownTopicIdException { + Map<TopicPartition, PartitionData> fetchData = new LinkedHashMap<>(); Review comment: Since we cache fetchData before, perhaps we could cache it in the new implementation too? This will make it more consistent with FetchResponse. Ditto for toForget(). ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala ########## @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, DefaultApiVersion, KAFKA_2_7_IV0, KAFKA_2_8_IV1, KAFKA_3_0_IV1} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should be at most 2 different IBP versions, but for more test coverage, three are used here. + Seq( + createConfig(0, KAFKA_2_7_IV0), + createConfig(1, KAFKA_2_8_IV1), + createConfig(2, KAFKA_3_0_IV1) + ) + } + + @Test + def testControllerOldIBP(): Unit = { + val topic = "topic" + val producer = createProducer() + val consumer = createConsumer() + + // Ensure controller version < KAFKA_2_8_IV1, and then create a topic where leader of partition 0 is not the controller, + // leader of partition 1 is. + ensureControllerWithIBP(KAFKA_2_7_IV0) + assertEquals(0, controllerSocketServer.config.brokerId) + val partitionLeaders = createTopic(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1))) + TestUtils.waitForAllPartitionsMetadata(servers, topic, 2) + + assertEquals(1, partitionLeaders(0)) + assertEquals(0, partitionLeaders(1)) + + val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, "value".getBytes) + val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, "value".getBytes) + producer.send(record1) + producer.send(record2) + + consumer.assign(asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1))) + val count = consumer.poll(Duration.ofMillis(5000)).count() + consumer.poll(Duration.ofMillis(5000)).count() + assertEquals(2, count) + } + + @Test + def testControllerNewIBP(): Unit = { Review comment: Could we share the common code btw testControllerNewIBP() and testControllerOldIBP()? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -71,6 +79,14 @@ public FetchResponseData data() { return data; } + /** + * From version 3 or later, the authorized and existing entries in `FetchRequest.fetchData` should be in the same order in `responseData`. + * Version 13 introduces topic IDs which can lead to a few new errors. If there is any unknown topic ID in the request, the + * response will contain a top-level UNKNOWN_TOPIC_ID error and UNKNOWN_TOPIC_ID errors on all the partitions. + * If a request's topic ID usage is inconsistent with the session, we will return a top level FETCH_SESSION_TOPIC_ID_ERROR error. + * We may also return INCONSISTENT_TOPIC_ID error as a top-level error as well as an error for a given partition when that partition in the session has a topic ID Review comment: I thought that INCONSISTENT_TOPIC_ID is always a top level error now? ########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -261,6 +267,21 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { }.toSet } + def topicNamesToIds(): util.Map[String, Uuid] = { + metadataSnapshot.topicIds.asJava + } + + def topicIdsToNames(): util.Map[Uuid, String] = { + metadataSnapshot.topicNames.asJava + } + + /** + * This method returns a map from topic names to IDs and a map from topic IDs to names + */ + def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = { + (topicNamesToIds(), topicIdsToNames()) Review comment: Could we first save metadataSnapshot to a local val and then derive both maps so that they can be consistent? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String, try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] - if (!fetchSessionHandler.handleResponse(fetchResponse)) { - Map.empty + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { + if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID) + throw new UnknownTopicIdException("There was a topic ID in the request that was unknown to the server.") + else if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) + throw new FetchSessionTopicIdException("There was a topic ID in the request that was inconsistent with the session.") + else if (fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) + throw new InconsistentTopicIdException("There was a topic ID in the request that was inconsistent with the one in the logs.") + else Review comment: Could we just use Errors.forCode() to translate errorCode to exception generically? ########## File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ########## @@ -116,61 +122,85 @@ class FetchRequestTest extends BaseRequestTest { val fetchRequest1 = createFetchRequest(shuffledTopicPartitions1) val fetchResponse1 = sendFetchRequest(leaderId, fetchRequest1) checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, maxPartitionBytes, maxResponseBytes, messagesPerPartition) + val fetchRequest1V12 = createFetchRequest(shuffledTopicPartitions1, version = 12) + val fetchResponse1V12 = sendFetchRequest(leaderId, fetchRequest1V12) + checkFetchResponse(shuffledTopicPartitions1, fetchResponse1V12, maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12) // 2. Same as 1, but shuffled again val shuffledTopicPartitions2 = random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages val fetchRequest2 = createFetchRequest(shuffledTopicPartitions2) val fetchResponse2 = sendFetchRequest(leaderId, fetchRequest2) checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, maxPartitionBytes, maxResponseBytes, messagesPerPartition) + val fetchRequest2V12 = createFetchRequest(shuffledTopicPartitions2, version = 12) + val fetchResponse2V12 = sendFetchRequest(leaderId, fetchRequest2V12) + checkFetchResponse(shuffledTopicPartitions2, fetchResponse2V12, maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12) // 3. Partition with message larger than the partition limit at the start of the list val shuffledTopicPartitions3 = Seq(partitionWithLargeMessage1, partitionWithLargeMessage2) ++ random.shuffle(partitionsWithoutLargeMessages) val fetchRequest3 = createFetchRequest(shuffledTopicPartitions3, Map(partitionWithLargeMessage1 -> messagesPerPartition)) val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3) - assertEquals(shuffledTopicPartitions3, fetchResponse3.responseData.keySet.asScala.toSeq) - val responseSize3 = fetchResponse3.responseData.asScala.values.map { partitionData => - records(partitionData).map(_.sizeInBytes).sum - }.sum - assertTrue(responseSize3 <= maxResponseBytes) - val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1) - assertEquals(Errors.NONE.code, partitionData3.errorCode) - assertTrue(partitionData3.highWatermark > 0) - val size3 = records(partitionData3).map(_.sizeInBytes).sum - assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than $maxResponseBytes") - assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than $maxPartitionBytes") - assertTrue(maxPartitionBytes < FetchResponse.recordsSize(partitionData3)) + val fetchRequest3V12 = createFetchRequest(shuffledTopicPartitions3, Map(partitionWithLargeMessage1 -> messagesPerPartition), 12) + val fetchResponse3V12 = sendFetchRequest(leaderId, fetchRequest3V12) + def evaluateResponse3(response: FetchResponse, version: Short = ApiKeys.FETCH.latestVersion()) = { + val responseData = response.responseData(topicNames, version) + assertEquals(shuffledTopicPartitions3, responseData.keySet.asScala.toSeq) + val responseSize = responseData.asScala.values.map { partitionData => + records(partitionData).map(_.sizeInBytes).sum + }.sum + assertTrue(responseSize <= maxResponseBytes) + val partitionData = responseData.get(partitionWithLargeMessage1) + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertTrue(partitionData.highWatermark > 0) + val size3 = records(partitionData).map(_.sizeInBytes).sum + assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than $maxResponseBytes") + assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than $maxPartitionBytes") + assertTrue(maxPartitionBytes < partitionData.records.sizeInBytes) + } + evaluateResponse3(fetchResponse3) + evaluateResponse3(fetchResponse3V12, 12) // 4. Partition with message larger than the response limit at the start of the list val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, partitionWithLargeMessage1) ++ random.shuffle(partitionsWithoutLargeMessages) val fetchRequest4 = createFetchRequest(shuffledTopicPartitions4, Map(partitionWithLargeMessage2 -> messagesPerPartition)) val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4) - assertEquals(shuffledTopicPartitions4, fetchResponse4.responseData.keySet.asScala.toSeq) - val nonEmptyPartitions4 = fetchResponse4.responseData.asScala.toSeq.collect { - case (tp, partitionData) if records(partitionData).map(_.sizeInBytes).sum > 0 => tp + val fetchRequest4V12 = createFetchRequest(shuffledTopicPartitions4, Map(partitionWithLargeMessage2 -> messagesPerPartition), 12) + val fetchResponse4V12 = sendFetchRequest(leaderId, fetchRequest4V12) + def evaluateResponse4(response: FetchResponse, version: Short = ApiKeys.FETCH.latestVersion()) = { + val responseData = response.responseData(topicNames, version) + assertEquals(shuffledTopicPartitions4, responseData.keySet.asScala.toSeq) + val nonEmptyPartitions = responseData.asScala.toSeq.collect { + case (tp, partitionData) if records(partitionData).map(_.sizeInBytes).sum > 0 => tp + } + assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions) + val partitionData = responseData.get(partitionWithLargeMessage2) + assertEquals(Errors.NONE.code, partitionData.errorCode) + assertTrue(partitionData.highWatermark > 0) + val size4 = records(partitionData).map(_.sizeInBytes).sum + assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than $maxResponseBytes") + assertTrue(maxResponseBytes < partitionData.records.sizeInBytes) } - assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4) - val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2) - assertEquals(Errors.NONE.code, partitionData4.errorCode) - assertTrue(partitionData4.highWatermark > 0) - val size4 = records(partitionData4).map(_.sizeInBytes).sum - assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than $maxResponseBytes") - assertTrue(maxResponseBytes < FetchResponse.recordsSize(partitionData4)) + evaluateResponse4(fetchResponse4) + evaluateResponse4(fetchResponse4V12, 12) } @Test def testFetchRequestV2WithOversizedMessage(): Unit = { initProducer() val maxPartitionBytes = 200 val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head + val topicIds = getTopicIds().asJava + val topicNames = topicIds.asScala.map(_.swap).asJava producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get - val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, - Seq(topicPartition))).build(2) + val fetchRequest = FetchRequest.Builder.forConsumer(2, Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, + Seq(topicPartition)), topicIds).build(2) val fetchResponse = sendFetchRequest(leaderId, fetchRequest) - val partitionData = fetchResponse.responseData.get(topicPartition) + val partitionData = fetchResponse.responseData(topicNames, 2).get(topicPartition) assertEquals(Errors.NONE.code, partitionData.errorCode) + //assertEquals(Errors.NONE.code, partitionData.errorCode) Review comment: Should we remove this line? Ditto in a few other places in this file. ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -746,7 +753,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) - sendRequests(requestKeyToRequest) + sendRequests(requestKeyToRequest, true) + } + + /* + * even if the topic doesn't exist, request APIs should not leak the topic name + */ + @Test + def testAuthorizationWithTopicNotExisting(): Unit = { + val id = Uuid.randomUuid() + val topicNames = Map(id -> "topic") + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), + ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, ApiKeys.FETCH.latestVersion()), + ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, + ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, + ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, + ApiKeys.DELETE_TOPICS -> deleteTopicsRequest, + ApiKeys.DELETE_RECORDS -> deleteRecordsRequest, + ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, + ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, + ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, + ApiKeys.DELETE_GROUPS -> deleteGroupsRequest, + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, + ApiKeys.ELECT_LEADERS -> electLeadersRequest + ) + + sendRequests(requestKeyToRequest, false, topicNames) Review comment: Since this tests non-existing topics, why do we pass in topicNames for fetch requests? ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -154,17 +154,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { classOf[PrincipalBuilder].getName) } - val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, Nothing => Errors]( + val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => { Errors.forCode( resp.data .responses.find(topic) .partitionResponses.asScala.find(_.index == part).get .errorCode - ) + ) }), - ApiKeys.FETCH -> ((resp: requests.FetchResponse) => Errors.forCode(resp.responseData.asScala.find(_._1 == tp).get._2.errorCode)), + ApiKeys.FETCH -> ((resp: requests.FetchResponse) => Errors.forCode(resp.responseData(topicNames.asJava, version).asScala.find(_._1 == tp).get._2.errorCode)), Review comment: This is an existing issue, but could we use case to remove unnamed references _._1? ########## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ########## @@ -226,6 +226,18 @@ class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging { _currentImage.partitions.numTopicPartitions(topic) } + override def topicNamesToIds(): util.Map[String, Uuid] = { + _currentImage.partitions.copyReverseIdMap() + } + + override def topicIdsToNames(): util.Map[Uuid, String] = { + _currentImage.partitions.copyIdMap() + } + + override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = { + (topicNamesToIds(), topicIdsToNames()) Review comment: Could we first save _currentImage to a local val and derive both maps from it so that they are consistent? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) { private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions = new LinkedHashMap<>(0); + /** + * All of the topic ids mapped to topic names for topics which exist in the fetch request session. + */ + private Map<String, Uuid> sessionTopicIds = new HashMap<>(0); + + /** + * All of the topic names mapped to topic ids for topics which exist in the fetch request session. + */ + private Map<Uuid, String> sessionTopicNames = new HashMap<>(0); + + public Map<Uuid, String> sessionTopicNames() { + return sessionTopicNames; + } + + private boolean canUseTopicIds = false; Review comment: In the latest PR, it seems that canUseTopicIds is updated on every build() call and can be a local val? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -171,7 +173,8 @@ class CachedPartition(val topic: String, this.eq(that) || (that.canEqual(this) && this.partition.equals(that.partition) && - this.topic.equals(that.topic)) + this.topic.equals(that.topic) && + this.topicId.equals(that.topicId)) Review comment: Should we add topicId in toString()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org