lucasbru commented on code in PR #20325: URL: https://github.com/apache/kafka/pull/20325#discussion_r2316984834
########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -122,9 +232,11 @@ class DefaultAutoTopicCreationManager( override def onComplete(response: ClientResponse): Unit = { clearInflightRequests(creatableTopics) if (response.authenticationException() != null) { - warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception") + val authException = response.authenticationException() Review Comment: nit: lets not change this function at all, it's not related to what we are doing. ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -50,21 +52,121 @@ trait AutoTopicCreationManager { def createStreamsInternalTopics( topics: Map[String, CreatableTopic], - requestContext: RequestContext + requestContext: RequestContext, + expirationTimeMs: Long ): Unit + def getStreamsInternalTopicCreationErrors( + topicNames: Set[String], + currentTimeMs: Long + ): Map[String, String] + + def close(): Unit = {} + +} + +/** + * Thread-safe cache that stores topic creation errors with per-entry expiration. + * - Expiration: maintained by a min-heap (priority queue) on expiration time + * - Capacity: enforced by insertion-order removal (keeps the most recently inserted entries) + */ +class ExpiringErrorCache(maxSize: Int) { + + private case class Entry(topicName: String, errorMessage: String, expirationTimeMs: Long) + + private val byTopic = new java.util.HashMap[String, Entry]() + private val expiryQueue = new java.util.PriorityQueue[Entry](11, new java.util.Comparator[Entry] { + override def compare(a: Entry, b: Entry): Int = java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs) + }) + private val lock = new ReentrantLock() + + def put(topicName: String, errorMessage: String, expirationTimeMs: Long): Unit = { + lock.lock() + try { + val existing = byTopic.get(topicName) + if (existing != null) { + // Remove old instance from structures + expiryQueue.remove(existing) + } + + val entry = Entry(topicName, errorMessage, expirationTimeMs) + byTopic.put(topicName, entry) + expiryQueue.add(entry) + + // Enforce capacity by removing entries with earliest expiration time first + while (byTopic.size() > maxSize && !expiryQueue.isEmpty) { + val evicted = expiryQueue.poll() + if (evicted != null) { + val current = byTopic.get(evicted.topicName) + if (current != null && (current eq evicted)) { + byTopic.remove(evicted.topicName) + } + } + } + } finally { + lock.unlock() + } + } + + def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): Map[String, String] = { + lock.lock() + try { + cleanupExpired(currentTimeMs) + val result = mutable.Map.empty[String, String] + topicNames.foreach { topicName => + val entry = byTopic.get(topicName) + if (entry != null) { + result.put(topicName, entry.errorMessage) + } + } + result.toMap + } finally { + lock.unlock() + } + } + + def cleanupExpired(currentTimeMs: Long): Unit = { + lock.lock() + try { + while (!expiryQueue.isEmpty && expiryQueue.peek().expirationTimeMs <= currentTimeMs) { + val expired = expiryQueue.poll() + val current = byTopic.get(expired.topicName) + if (current != null && (current eq expired)) { + byTopic.remove(expired.topicName) + } + } + } finally { + lock.unlock() + } + } + + def clear(): Unit = { + lock.lock() + try { + byTopic.clear() + expiryQueue.clear() + } finally { + lock.unlock() + } + } } + class DefaultAutoTopicCreationManager( config: KafkaConfig, channelManager: NodeToControllerChannelManager, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, - shareCoordinator: ShareCoordinator + shareCoordinator: ShareCoordinator, + time: Time, + cacheCapacity: Int = 1000 Review Comment: nit: topicErrorCacheCapacity ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -50,21 +52,121 @@ trait AutoTopicCreationManager { def createStreamsInternalTopics( topics: Map[String, CreatableTopic], - requestContext: RequestContext + requestContext: RequestContext, + expirationTimeMs: Long ): Unit + def getStreamsInternalTopicCreationErrors( + topicNames: Set[String], + currentTimeMs: Long + ): Map[String, String] + + def close(): Unit = {} + +} + +/** + * Thread-safe cache that stores topic creation errors with per-entry expiration. + * - Expiration: maintained by a min-heap (priority queue) on expiration time + * - Capacity: enforced by insertion-order removal (keeps the most recently inserted entries) + */ +class ExpiringErrorCache(maxSize: Int) { + + private case class Entry(topicName: String, errorMessage: String, expirationTimeMs: Long) + + private val byTopic = new java.util.HashMap[String, Entry]() + private val expiryQueue = new java.util.PriorityQueue[Entry](11, new java.util.Comparator[Entry] { + override def compare(a: Entry, b: Entry): Int = java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs) + }) + private val lock = new ReentrantLock() + + def put(topicName: String, errorMessage: String, expirationTimeMs: Long): Unit = { + lock.lock() + try { + val existing = byTopic.get(topicName) + if (existing != null) { + // Remove old instance from structures + expiryQueue.remove(existing) + } + + val entry = Entry(topicName, errorMessage, expirationTimeMs) + byTopic.put(topicName, entry) + expiryQueue.add(entry) + + // Enforce capacity by removing entries with earliest expiration time first + while (byTopic.size() > maxSize && !expiryQueue.isEmpty) { + val evicted = expiryQueue.poll() + if (evicted != null) { + val current = byTopic.get(evicted.topicName) + if (current != null && (current eq evicted)) { + byTopic.remove(evicted.topicName) + } + } + } + } finally { + lock.unlock() + } + } + + def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): Map[String, String] = { + lock.lock() + try { + cleanupExpired(currentTimeMs) + val result = mutable.Map.empty[String, String] + topicNames.foreach { topicName => + val entry = byTopic.get(topicName) + if (entry != null) { + result.put(topicName, entry.errorMessage) + } + } + result.toMap + } finally { + lock.unlock() + } + } + + def cleanupExpired(currentTimeMs: Long): Unit = { + lock.lock() Review Comment: Why is this not private? Isn't this called from places where we already own the lock? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2888,10 +2888,34 @@ class KafkaApis(val requestChannel: RequestChannel, ) } } else { - autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext); + // Compute group-specific expiration time for freshly cached errors + val sessionTimeoutMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null)) + .map(_.streamsSessionTimeoutMs().toLong) + .getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong) + val expirationTimeMs = time.milliseconds() + sessionTimeoutMs Review Comment: Two things about this - I believe we said that we want to use `heartbeatIntervalMs * 2` for the timeout, to make sure a client sees the topic creation errors. - I would pass an timeoutMs (a duration), not the expirationTimeMs (a time). We want to start the timeout from the point we receive the response, not here. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2889,9 +2889,31 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext); + + // Check for cached topic creation errors only if there's already a MISSING_INTERNAL_TOPICS status + val hasMissingInternalTopicsStatus = responseData.status() != null && + responseData.status().stream().anyMatch(s => s.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + + if (hasMissingInternalTopicsStatus) { + // Calculate group-specific error cache TTL + val errorCacheTtlMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null)) + .map(_.streamsSessionTimeoutMs().toLong) + .getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong) + + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(topicsToCreate.keys.toSet, errorCacheTtlMs) + if (cachedErrors.nonEmpty) { + val missingInternalTopicStatus = + responseData.status().stream().filter(x => x.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst() + val creationErrorDetails = cachedErrors.map { case (topic, error) => s"$topic ($error)" }.mkString(", ") + if (missingInternalTopicStatus.isPresent) { + missingInternalTopicStatus.get().setStatusDetail( + missingInternalTopicStatus.get().statusDetail() + s"; Creation failed: $creationErrorDetails." + ) Review Comment: Please check if this can happen or handle this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org