junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660016282
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -193,18 +197,22 @@ class CachedPartition(val topic: String, * Each fetch session is protected by its own lock, which must be taken before mutable * fields are read or modified. This includes modification of the session partition map. * - * @param id The unique fetch session ID. - * @param privileged True if this session is privileged. Sessions crated by followers - * are privileged; sesssion created by consumers are not. - * @param partitionMap The CachedPartitionMap. - * @param creationMs The time in milliseconds when this session was created. - * @param lastUsedMs The last used time in milliseconds. This should only be updated by - * FetchSessionCache#touch. - * @param epoch The fetch session sequence number. + * @param id The unique fetch session ID. + * @param privileged True if this session is privileged. Sessions crated by followers + * are privileged; session created by consumers are not. + * @param partitionMap The CachedPartitionMap. + * @param usesTopicIds True if this session is using topic IDs + * @param sessionTopicIds The mapping from topic name to topic ID for topics in the session. + * @param creationMs The time in milliseconds when this session was created. + * @param lastUsedMs The last used time in milliseconds. This should only be updated by + * FetchSessionCache#touch. + * @param epoch The fetch session sequence number. */ class FetchSession(val id: Int, val privileged: Boolean, val partitionMap: FetchSession.CACHE_MAP, + val usesTopicIds: Boolean, + val sessionTopicIds: FetchSession.TOPIC_ID_MAP, Review comment: Do we need to include the new fields in toString()? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -231,20 +239,31 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { - Option(partitionMap.find(new CachedPartition(topicPartition))).map(_.fetchOffset) + Option(partitionMap.find(new CachedPartition(topicPartition, + sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID)))).map(_.fetchOffset) } type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL + val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. + // If the topic already existed, check that its ID is consistent. + val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) + val newCachedPart = new CachedPartition(topicPart, id, reqData) + if (id != Uuid.ZERO_UUID) { + val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) Review comment: It seems that we should never change the topicId in sessionTopicIds? Perhaps we should use putIfAbsent. Similarly, if the topicId changes, I am not sure if we should update partitionMap below. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -471,16 +505,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") - FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) + FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { + var error = Errors.NONE Review comment: error => topLevelError? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") - FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) + FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { + var error = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent + // It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + if (partitionIter.next().getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID Review comment: Ok, this is fine. I was thinking that when topicId changes, a pending fetch request could still reference the outdated Partition object and therefore miss the topicId change. This is unlikely and can be tighten up by clearing the segment list when a partition is deleted. Regarding the metadata propagation, it's true that right now, we propagate the LeaderAndIsrRequest before the UpdateMetadataRequest. With Raft, the topicId will always flow through metadata update first, followed by the ReplicaManager. When we get there, maybe we could simplify the the logic a bit. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadata The request metadata. + * @param version The version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { - fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { + fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { - FetchResponse.sizeOf(versionId, updates.entrySet.iterator) + FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { - def createNewSession: FetchSession.CACHE_MAP = { + var error = Errors.NONE + def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) + val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => + if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID val reqData = fetchData.get(part) - cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) + val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) + cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) + if (id != Uuid.ZERO_UUID) + sessionTopicIds.put(part.topic, id) } - cachedPartitions + (cachedPartitions, sessionTopicIds) } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, - updates.size, () => createNewSession) + updates.size, version, () => createNewSession) debug(s"Full fetch context with session id $responseSessionId returning " + s"${partitionsToLogString(updates.keySet)}") - FetchResponse.of(Errors.NONE, 0, responseSessionId, updates) + FetchResponse.of(error, 0, responseSessionId, updates, topicIds) Review comment: Typically, if there is a topic level error, we set the same error in every partition through FetchRequest.getErrorResponse(). Should we do the same thing here? Ditto for IncrementalFetchContext.updateAndGenerateResponseData(). ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadata The request metadata. + * @param version The version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { - fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { + fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { - FetchResponse.sizeOf(versionId, updates.entrySet.iterator) + FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { - def createNewSession: FetchSession.CACHE_MAP = { + var error = Errors.NONE Review comment: Should we rename error to topLevelError to make it clearer? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadata The request metadata. + * @param version The version of the request, * @param fetchData The partition data from the fetch request. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, + private val version: Short, Review comment: Should we use useTopicId instead of version? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") - FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) + FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { + var topLevelError = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent + // It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + val entry = partitionIter.next() + if (entry.getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { Review comment: > I'm still not sure I follow "pending fetch request could still reference the outdated Partition object and therefore miss the topicId change" My understanding is that the log is the source of truth and we will either read from the log if it matches and not read if it doesn't. I see we could get an error erroneously if the partition didn't update in time, but I don't see us being able to read from the log due to a stale partition. > > Or are you referring to the getPartitionOrException(tp) call picking up a stale partition and both the request and the partition are stale? In this case, we will read from the log, but will identify it with its correct ID. The client will handle based on this. A fetch request may pass the topicId check in ReplicaManager and is about to call log.read(), when the topicId changes. I was wondering in that case, if log.read() could return data that corresponds to the old topicId. It seems that's not possible since Log.close() closes all segments. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -354,38 +377,55 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param cache The fetch session cache. * @param reqMetadata The request metadata. * @param fetchData The partition data from the fetch request. + * @param usesTopicIds True if this session should use topic IDs. + * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val usesTopicIds: Boolean, + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { override def getFetchOffset(part: TopicPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { - fetchData.forEach(fun(_, _)) + override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { + fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { - FetchResponse.sizeOf(versionId, updates.entrySet.iterator) + FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { - def createNewSession: FetchSession.CACHE_MAP = { + var topLevelError = Errors.NONE + def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) + val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => + if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { + info(s"Session encountered an inconsistent topic ID for topicPartition $part.") + topLevelError = Errors.INCONSISTENT_TOPIC_ID + } val reqData = fetchData.get(part) - cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) + val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) + cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) + if (id != Uuid.ZERO_UUID) + sessionTopicIds.put(part.topic, id) } - cachedPartitions + (cachedPartitions, sessionTopicIds) } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, - updates.size, () => createNewSession) - debug(s"Full fetch context with session id $responseSessionId returning " + - s"${partitionsToLogString(updates.keySet)}") - FetchResponse.of(Errors.NONE, 0, responseSessionId, updates) + updates.size, usesTopicIds, () => createNewSession) + if (topLevelError == Errors.INCONSISTENT_TOPIC_ID) { Review comment: This can also cause a bit confusing that we are treating INCONSISTENT_TOPIC_ID differently from other top-level errors. Since the only possible top level error is INCONSISTENT_TOPIC_ID, perhaps we can change topLevelError to hasInconsistentTopicId. Ditto in IncrementalFetchContext. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -296,11 +276,24 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { // may not be any partitions at all in the response. For this reason, the top-level error code // is essential for them. Errors error = Errors.forException(e); - LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = new LinkedHashMap<>(); - for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) { - responseData.put(entry.getKey(), FetchResponse.partitionResponse(entry.getKey().partition(), error)); + List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>(); + // Since UNKNOWN_TOPIC_ID is a new error type only returned when topic ID requests are made (from newer clients), + // we can skip returning the error on all partitions and returning any partitions at all. + if (error != Errors.UNKNOWN_TOPIC_ID) { Review comment: This kind of special treatment for UNKNOWN_TOPIC_ID is a bit weird. If you look at the comment above, the reason for setting the same error code in all partitions is for backward compatibility when we don't have a top level error code. So, we probably can just check the request version. If version is >=13, we just always return a top level error code with no partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org