aliehsaeedii commented on code in PR #20325: URL: https://github.com/apache/kafka/pull/20325#discussion_r2273217176
########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -53,17 +55,34 @@ trait AutoTopicCreationManager { requestContext: RequestContext ): Unit + def getTopicCreationErrors( + topicNames: Set[String] + ): Map[String, String] + + def close(): Unit = {} + +} + +case class CachedTopicCreationError( + errorMessage: String, + time: Time +) { + val timestamp: Long = time.milliseconds() } class DefaultAutoTopicCreationManager( config: KafkaConfig, channelManager: NodeToControllerChannelManager, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, - shareCoordinator: ShareCoordinator + shareCoordinator: ShareCoordinator, + time: Time ) extends AutoTopicCreationManager with Logging { private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]() + // Use session timeout instead of request timeout for better semantic alignment with client lifecycle Review Comment: Do we need to add `instead of...`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org