junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r655561404



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var error = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          if (partitionIter.next().getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code())
+            error = Errors.INCONSISTENT_TOPIC_ID

Review comment:
       The INCONSISTENT_TOPIC_ID check in ReplicaManager is not very precise 
since the topicId could change immediately after the check. I am thinking that 
another way to do this is to validate the topicId in the session again when we 
are generating the fetch response. We could pass in the latest topicNameToId 
mapping from the metadata Cache to updateAndGenerateResponseData(). If the 
topicId is different from those in the fetch session, we could generate a top 
level INCONSISTENT_TOPIC_ID error. We could then get rid of the 
INCONSISTENT_TOPIC_ID check in ReplicaManager.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -80,19 +96,28 @@ public Errors error() {
         return Errors.forCode(data.errorCode());
     }
 
-    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData() {
+    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData(Map<Uuid, String> topicNames, short version) {
         if (responseData == null) {
             synchronized (this) {
                 if (responseData == null) {
                     responseData = new LinkedHashMap<>();
-                    data.responses().forEach(topicResponse ->
+                    data.responses().forEach(topicResponse -> {
+                        String name;
+                        if (version < 13) {
+                            name = topicResponse.topic();
+                        } else {
+                            name = topicNames.get(topicResponse.topicId());
+                        }
+                        if (name != null) {
                             topicResponse.partitions().forEach(partition ->
-                                    responseData.put(new 
TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
-                    );
+                                    responseData.put(new TopicPartition(name, 
partition.partitionIndex()), partition));
+                        }
+                    });
                 }
             }
         }
         return responseData;
+

Review comment:
       extra empty line.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -213,9 +289,24 @@ public FetchRequestData build() {
                 }
                 sessionPartitions = next;
                 next = null;
+                canUseTopicIds = partitionsWithoutTopicIds == 0;

Review comment:
       Could we do this once at the beginning of build()?

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -362,7 +363,8 @@
     BROKER_ID_NOT_REGISTERED(102, "The given broker ID was not registered.", 
BrokerIdNotRegisteredException::new),
     INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID 
in the request", InconsistentTopicIdException::new),
     INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match 
that found on the server", InconsistentClusterIdException::new),
-    TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", 
TransactionalIdNotFoundException::new);
+    TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", 
TransactionalIdNotFoundException::new),
+    FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered 
inconsistent topic ID usage", FetchSessionTopicIdException::new);

Review comment:
       Is there a benefit to have FETCH_SESSION_TOPIC_ID_ERROR in addition to 
INCONSISTENT_TOPIC_ID? Could we just always use INCONSISTENT_TOPIC_ID?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse]
-      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-        Map.empty
+      if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
+        if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID)
+          throw new UnknownTopicIdException("There was a topic ID in the 
request that was unknown to the server.")
+        else if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+          throw new FetchSessionTopicIdException("There was a topic ID in the 
request that was inconsistent with the session.")
+        else if (fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID)
+          throw new InconsistentTopicIdException("There was a topic ID in the 
request that was inconsistent with the one in the logs.")
+        else
+          Map.empty
       } else {
-        fetchResponse.responseData.asScala
+        fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, 
clientResponse.requestHeader().apiVersion()).asScala
       }
     } catch {
+      case topicIdError @ (_:UnknownTopicIdException | 
_:FetchSessionTopicIdException | _:InconsistentTopicIdException) =>

Review comment:
       > I think there are other errors that can occur when trying to send the 
request which is why we have fetchSessionHandler.handleError(t). But this all 
can probably be cleaned up a bit/improved so I will take a look.
   
   Then could we try/catch just leaderEndpoint.sendRequest and call 
fetchSessionHandler.handleError(t) on exception? This will make the code easier 
to understand.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -319,12 +306,55 @@ public int maxBytes() {
         return data.maxBytes();
     }
 
-    public Map<TopicPartition, PartitionData> fetchData() {
-        return fetchData;
+    // For versions < 13, builds the partitionData map using only the 
FetchRequestData.
+    // For versions 13+, builds the partitionData map using both the 
FetchRequestData and a mapping of topic IDs to names.
+    // Throws UnknownTopicIdException for versions 13+ if the topic ID was 
unknown to the server.
+    public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> 
topicNames) throws UnknownTopicIdException {
+        Map<TopicPartition, PartitionData> fetchData = new LinkedHashMap<>();

Review comment:
       Since we cache fetchData before, perhaps we could cache it in the new 
implementation too? This will make it more consistent with FetchResponse. Ditto 
for toForget().

##########
File path: 
core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
##########
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, DefaultApiVersion, KAFKA_2_7_IV0, KAFKA_2_8_IV1, 
KAFKA_3_0_IV1}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest {
+
+  override def brokerCount: Int = 3
+  override def generateConfigs: Seq[KafkaConfig] = {
+    // Brokers should be at most 2 different IBP versions, but for more test 
coverage, three are used here.
+    Seq(
+      createConfig(0, KAFKA_2_7_IV0),
+      createConfig(1, KAFKA_2_8_IV1),
+      createConfig(2, KAFKA_3_0_IV1)
+    )
+  }
+
+  @Test
+  def testControllerOldIBP(): Unit = {
+    val topic = "topic"
+    val producer = createProducer()
+    val consumer = createConsumer()
+
+    // Ensure controller version < KAFKA_2_8_IV1, and then create a topic 
where leader of partition 0 is not the controller,
+    // leader of partition 1 is.
+    ensureControllerWithIBP(KAFKA_2_7_IV0)
+    assertEquals(0, controllerSocketServer.config.brokerId)
+    val partitionLeaders = createTopic(topic,  Map(0 -> Seq(1, 0, 2), 1 -> 
Seq(0, 2, 1)))
+    TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
+
+    assertEquals(1, partitionLeaders(0))
+    assertEquals(0, partitionLeaders(1))
+
+    val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, 
"value".getBytes)
+    val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, 
"value".getBytes)
+    producer.send(record1)
+    producer.send(record2)
+
+    consumer.assign(asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)))
+    val count = consumer.poll(Duration.ofMillis(5000)).count() + 
consumer.poll(Duration.ofMillis(5000)).count()
+    assertEquals(2, count)
+  }
+
+  @Test
+  def testControllerNewIBP(): Unit = {

Review comment:
       Could we share the common code btw testControllerNewIBP() and 
testControllerOldIBP()?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -71,6 +79,14 @@ public FetchResponseData data() {
         return data;
     }
 
+    /**
+     * From version 3 or later, the authorized and existing entries in 
`FetchRequest.fetchData` should be in the same order in `responseData`.
+     * Version 13 introduces topic IDs which can lead to a few new errors. If 
there is any unknown topic ID in the request, the
+     * response will contain a top-level UNKNOWN_TOPIC_ID error and 
UNKNOWN_TOPIC_ID errors on all the partitions.
+     * If a request's topic ID usage is inconsistent with the session, we will 
return a top level FETCH_SESSION_TOPIC_ID_ERROR error.
+     * We may also return INCONSISTENT_TOPIC_ID error as a top-level error as 
well as an error for a given partition when that partition in the session has a 
topic ID

Review comment:
       I thought that INCONSISTENT_TOPIC_ID is always a top level error now?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -261,6 +267,21 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache 
with Logging {
     }.toSet
   }
 
+  def topicNamesToIds(): util.Map[String, Uuid] = {
+    metadataSnapshot.topicIds.asJava
+  }
+
+  def topicIdsToNames(): util.Map[Uuid, String] = {
+    metadataSnapshot.topicNames.asJava
+  }
+
+  /**
+   * This method returns a map from topic names to IDs and a map from topic 
IDs to names
+   */
+  def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
+    (topicNamesToIds(), topicIdsToNames())

Review comment:
       Could we first save metadataSnapshot to a local val and then derive both 
maps so that they can be consistent?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse]
-      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-        Map.empty
+      if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
+        if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID)
+          throw new UnknownTopicIdException("There was a topic ID in the 
request that was unknown to the server.")
+        else if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+          throw new FetchSessionTopicIdException("There was a topic ID in the 
request that was inconsistent with the session.")
+        else if (fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID)
+          throw new InconsistentTopicIdException("There was a topic ID in the 
request that was inconsistent with the one in the logs.")
+        else

Review comment:
       Could we just use Errors.forCode() to translate errorCode to exception 
generically?

##########
File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
##########
@@ -116,61 +122,85 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchRequest1 = createFetchRequest(shuffledTopicPartitions1)
     val fetchResponse1 = sendFetchRequest(leaderId, fetchRequest1)
     checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+    val fetchRequest1V12 = createFetchRequest(shuffledTopicPartitions1, 
version = 12)
+    val fetchResponse1V12 = sendFetchRequest(leaderId, fetchRequest1V12)
+    checkFetchResponse(shuffledTopicPartitions1, fetchResponse1V12, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12)
 
     // 2. Same as 1, but shuffled again
     val shuffledTopicPartitions2 = 
random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages
     val fetchRequest2 = createFetchRequest(shuffledTopicPartitions2)
     val fetchResponse2 = sendFetchRequest(leaderId, fetchRequest2)
     checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+    val fetchRequest2V12 = createFetchRequest(shuffledTopicPartitions2, 
version = 12)
+    val fetchResponse2V12 = sendFetchRequest(leaderId, fetchRequest2V12)
+    checkFetchResponse(shuffledTopicPartitions2, fetchResponse2V12, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12)
 
     // 3. Partition with message larger than the partition limit at the start 
of the list
     val shuffledTopicPartitions3 = Seq(partitionWithLargeMessage1, 
partitionWithLargeMessage2) ++
       random.shuffle(partitionsWithoutLargeMessages)
     val fetchRequest3 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition))
     val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
-    assertEquals(shuffledTopicPartitions3, 
fetchResponse3.responseData.keySet.asScala.toSeq)
-    val responseSize3 = fetchResponse3.responseData.asScala.values.map { 
partitionData =>
-      records(partitionData).map(_.sizeInBytes).sum
-    }.sum
-    assertTrue(responseSize3 <= maxResponseBytes)
-    val partitionData3 = 
fetchResponse3.responseData.get(partitionWithLargeMessage1)
-    assertEquals(Errors.NONE.code, partitionData3.errorCode)
-    assertTrue(partitionData3.highWatermark > 0)
-    val size3 = records(partitionData3).map(_.sizeInBytes).sum
-    assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than 
$maxResponseBytes")
-    assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than 
$maxPartitionBytes")
-    assertTrue(maxPartitionBytes < FetchResponse.recordsSize(partitionData3))
+    val fetchRequest3V12 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition), 12)
+    val fetchResponse3V12 = sendFetchRequest(leaderId, fetchRequest3V12)
+    def evaluateResponse3(response: FetchResponse, version: Short = 
ApiKeys.FETCH.latestVersion()) = {
+      val responseData = response.responseData(topicNames, version)
+      assertEquals(shuffledTopicPartitions3, responseData.keySet.asScala.toSeq)
+      val responseSize = responseData.asScala.values.map { partitionData =>
+        records(partitionData).map(_.sizeInBytes).sum
+      }.sum
+      assertTrue(responseSize <= maxResponseBytes)
+      val partitionData = responseData.get(partitionWithLargeMessage1)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+      val size3 = records(partitionData).map(_.sizeInBytes).sum
+      assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller 
than $maxResponseBytes")
+      assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger 
than $maxPartitionBytes")
+      assertTrue(maxPartitionBytes < partitionData.records.sizeInBytes)
+    }
+    evaluateResponse3(fetchResponse3)
+    evaluateResponse3(fetchResponse3V12, 12)
 
     // 4. Partition with message larger than the response limit at the start 
of the list
     val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, 
partitionWithLargeMessage1) ++
       random.shuffle(partitionsWithoutLargeMessages)
     val fetchRequest4 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition))
     val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
-    assertEquals(shuffledTopicPartitions4, 
fetchResponse4.responseData.keySet.asScala.toSeq)
-    val nonEmptyPartitions4 = 
fetchResponse4.responseData.asScala.toSeq.collect {
-      case (tp, partitionData) if 
records(partitionData).map(_.sizeInBytes).sum > 0 => tp
+    val fetchRequest4V12 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition), 12)
+    val fetchResponse4V12 = sendFetchRequest(leaderId, fetchRequest4V12)
+    def evaluateResponse4(response: FetchResponse, version: Short = 
ApiKeys.FETCH.latestVersion()) = {
+      val responseData = response.responseData(topicNames, version)
+      assertEquals(shuffledTopicPartitions4, responseData.keySet.asScala.toSeq)
+      val nonEmptyPartitions = responseData.asScala.toSeq.collect {
+        case (tp, partitionData) if 
records(partitionData).map(_.sizeInBytes).sum > 0 => tp
+      }
+      assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions)
+      val partitionData = responseData.get(partitionWithLargeMessage2)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+      val size4 = records(partitionData).map(_.sizeInBytes).sum
+      assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than 
$maxResponseBytes")
+      assertTrue(maxResponseBytes < partitionData.records.sizeInBytes)
     }
-    assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
-    val partitionData4 = 
fetchResponse4.responseData.get(partitionWithLargeMessage2)
-    assertEquals(Errors.NONE.code, partitionData4.errorCode)
-    assertTrue(partitionData4.highWatermark > 0)
-    val size4 = records(partitionData4).map(_.sizeInBytes).sum
-    assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than 
$maxResponseBytes")
-    assertTrue(maxResponseBytes < FetchResponse.recordsSize(partitionData4))
+    evaluateResponse4(fetchResponse4)
+    evaluateResponse4(fetchResponse4V12, 12)
   }
 
   @Test
   def testFetchRequestV2WithOversizedMessage(): Unit = {
     initProducer()
     val maxPartitionBytes = 200
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
+    val topicIds = getTopicIds().asJava
+    val topicNames = topicIds.asScala.map(_.swap).asJava
     producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
-    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
-      Seq(topicPartition))).build(2)
+    val fetchRequest = FetchRequest.Builder.forConsumer(2, Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
+      Seq(topicPartition)), topicIds).build(2)
     val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
-    val partitionData = fetchResponse.responseData.get(topicPartition)
+    val partitionData = fetchResponse.responseData(topicNames, 
2).get(topicPartition)
     assertEquals(Errors.NONE.code, partitionData.errorCode)
+    //assertEquals(Errors.NONE.code, partitionData.errorCode)

Review comment:
       Should we remove this line? Ditto in a few other places in this file.

##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -746,7 +753,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
     )
 
-    sendRequests(requestKeyToRequest)
+    sendRequests(requestKeyToRequest, true)
+  }
+
+  /*
+   * even if the topic doesn't exist, request APIs should not leak the topic 
name
+   */
+  @Test
+  def testAuthorizationWithTopicNotExisting(): Unit = {
+    val id = Uuid.randomUuid()
+    val topicNames = Map(id -> "topic")
+    val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+      ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = 
false),
+      ApiKeys.PRODUCE -> createProduceRequest,
+      ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 
ApiKeys.FETCH.latestVersion()),
+      ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
+      ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
+      ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
+      ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
+      ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
+      ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
+      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
+      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
+      ApiKeys.ELECT_LEADERS -> electLeadersRequest
+    )
+
+    sendRequests(requestKeyToRequest, false, topicNames)

Review comment:
       Since this tests non-existing topics, why do we pass in topicNames for 
fetch requests?

##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -154,17 +154,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       classOf[PrincipalBuilder].getName)
   }
 
-  val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, 
Nothing => Errors](
+  val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => 
Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => 
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
       Errors.forCode(
         resp.data
           .responses.find(topic)
           .partitionResponses.asScala.find(_.index == part).get
           .errorCode
-      )      
+      )
     }),
-    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => 
Errors.forCode(resp.responseData.asScala.find(_._1 == tp).get._2.errorCode)),
+    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => 
Errors.forCode(resp.responseData(topicNames.asJava, version).asScala.find(_._1 
== tp).get._2.errorCode)),

Review comment:
       This is an existing issue, but could we use case to remove unnamed 
references _._1?

##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -226,6 +226,18 @@ class RaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging {
     _currentImage.partitions.numTopicPartitions(topic)
   }
 
+  override def topicNamesToIds(): util.Map[String, Uuid] = {
+    _currentImage.partitions.copyReverseIdMap()
+  }
+
+  override def topicIdsToNames(): util.Map[Uuid, String] = {
+    _currentImage.partitions.copyIdMap()
+  }
+
+  override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) 
= {
+    (topicNamesToIds(), topicIdsToNames())

Review comment:
       Could we first save _currentImage to a local val and derive both maps 
from it so that they are consistent?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+    /**
+     * All of the topic names mapped to topic ids for topics which exist in 
the fetch request session.
+     */
+    private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+    public Map<Uuid, String> sessionTopicNames() {
+        return sessionTopicNames;
+    }
+
+    private boolean canUseTopicIds = false;

Review comment:
       In the latest PR, it seems that canUseTopicIds is updated on every 
build() call and can be a local val?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -171,7 +173,8 @@ class CachedPartition(val topic: String,
         this.eq(that) ||
           (that.canEqual(this) &&
             this.partition.equals(that.partition) &&
-            this.topic.equals(that.topic))
+            this.topic.equals(that.topic) &&
+            this.topicId.equals(that.topicId))

Review comment:
       Should we add topicId in toString()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to