lucasbru commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2316984834


##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -122,9 +232,11 @@ class DefaultAutoTopicCreationManager(
       override def onComplete(response: ClientResponse): Unit = {
         clearInflightRequests(creatableTopics)
         if (response.authenticationException() != null) {
-          warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception")
+          val authException = response.authenticationException()

Review Comment:
   nit: lets not change this function at all, it's not related to what we are 
doing.



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -50,21 +52,121 @@ trait AutoTopicCreationManager {
 
   def createStreamsInternalTopics(
     topics: Map[String, CreatableTopic],
-    requestContext: RequestContext
+    requestContext: RequestContext,
+    expirationTimeMs: Long
   ): Unit
 
+  def getStreamsInternalTopicCreationErrors(
+    topicNames: Set[String],
+    currentTimeMs: Long
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
+}
+
+/**
+ * Thread-safe cache that stores topic creation errors with per-entry 
expiration.
+ * - Expiration: maintained by a min-heap (priority queue) on expiration time
+ * - Capacity: enforced by insertion-order removal (keeps the most recently 
inserted entries)
+ */
+class ExpiringErrorCache(maxSize: Int) {
+
+  private case class Entry(topicName: String, errorMessage: String, 
expirationTimeMs: Long)
+
+  private val byTopic = new java.util.HashMap[String, Entry]()
+  private val expiryQueue = new java.util.PriorityQueue[Entry](11, new 
java.util.Comparator[Entry] {
+    override def compare(a: Entry, b: Entry): Int = 
java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
+  })
+  private val lock = new ReentrantLock()
+
+  def put(topicName: String, errorMessage: String, expirationTimeMs: Long): 
Unit = {
+    lock.lock()
+    try {
+      val existing = byTopic.get(topicName)
+      if (existing != null) {
+        // Remove old instance from structures
+        expiryQueue.remove(existing)
+      }
+
+      val entry = Entry(topicName, errorMessage, expirationTimeMs)
+      byTopic.put(topicName, entry)
+      expiryQueue.add(entry)
+
+      // Enforce capacity by removing entries with earliest expiration time 
first
+      while (byTopic.size() > maxSize && !expiryQueue.isEmpty) {
+        val evicted = expiryQueue.poll()
+        if (evicted != null) {
+          val current = byTopic.get(evicted.topicName)
+          if (current != null && (current eq evicted)) {
+            byTopic.remove(evicted.topicName)
+          }
+        }
+      }
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): 
Map[String, String] = {
+    lock.lock()
+    try {
+      cleanupExpired(currentTimeMs)
+      val result = mutable.Map.empty[String, String]
+      topicNames.foreach { topicName =>
+        val entry = byTopic.get(topicName)
+        if (entry != null) {
+          result.put(topicName, entry.errorMessage)
+        }
+      }
+      result.toMap
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def cleanupExpired(currentTimeMs: Long): Unit = {
+    lock.lock()
+    try {
+      while (!expiryQueue.isEmpty && expiryQueue.peek().expirationTimeMs <= 
currentTimeMs) {
+        val expired = expiryQueue.poll()
+        val current = byTopic.get(expired.topicName)
+        if (current != null && (current eq expired)) {
+          byTopic.remove(expired.topicName)
+        }
+      }
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def clear(): Unit = {
+    lock.lock()
+    try {
+      byTopic.clear()
+      expiryQueue.clear()
+    } finally {
+      lock.unlock()
+    }
+  }
 }
 
+
 class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
   channelManager: NodeToControllerChannelManager,
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator,
-  shareCoordinator: ShareCoordinator
+  shareCoordinator: ShareCoordinator,
+  time: Time,
+  cacheCapacity: Int = 1000

Review Comment:
   nit: topicErrorCacheCapacity



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -50,21 +52,121 @@ trait AutoTopicCreationManager {
 
   def createStreamsInternalTopics(
     topics: Map[String, CreatableTopic],
-    requestContext: RequestContext
+    requestContext: RequestContext,
+    expirationTimeMs: Long
   ): Unit
 
+  def getStreamsInternalTopicCreationErrors(
+    topicNames: Set[String],
+    currentTimeMs: Long
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
+}
+
+/**
+ * Thread-safe cache that stores topic creation errors with per-entry 
expiration.
+ * - Expiration: maintained by a min-heap (priority queue) on expiration time
+ * - Capacity: enforced by insertion-order removal (keeps the most recently 
inserted entries)
+ */
+class ExpiringErrorCache(maxSize: Int) {
+
+  private case class Entry(topicName: String, errorMessage: String, 
expirationTimeMs: Long)
+
+  private val byTopic = new java.util.HashMap[String, Entry]()
+  private val expiryQueue = new java.util.PriorityQueue[Entry](11, new 
java.util.Comparator[Entry] {
+    override def compare(a: Entry, b: Entry): Int = 
java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
+  })
+  private val lock = new ReentrantLock()
+
+  def put(topicName: String, errorMessage: String, expirationTimeMs: Long): 
Unit = {
+    lock.lock()
+    try {
+      val existing = byTopic.get(topicName)
+      if (existing != null) {
+        // Remove old instance from structures
+        expiryQueue.remove(existing)
+      }
+
+      val entry = Entry(topicName, errorMessage, expirationTimeMs)
+      byTopic.put(topicName, entry)
+      expiryQueue.add(entry)
+
+      // Enforce capacity by removing entries with earliest expiration time 
first
+      while (byTopic.size() > maxSize && !expiryQueue.isEmpty) {
+        val evicted = expiryQueue.poll()
+        if (evicted != null) {
+          val current = byTopic.get(evicted.topicName)
+          if (current != null && (current eq evicted)) {
+            byTopic.remove(evicted.topicName)
+          }
+        }
+      }
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): 
Map[String, String] = {
+    lock.lock()
+    try {
+      cleanupExpired(currentTimeMs)
+      val result = mutable.Map.empty[String, String]
+      topicNames.foreach { topicName =>
+        val entry = byTopic.get(topicName)
+        if (entry != null) {
+          result.put(topicName, entry.errorMessage)
+        }
+      }
+      result.toMap
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def cleanupExpired(currentTimeMs: Long): Unit = {
+    lock.lock()

Review Comment:
   Why is this not private? Isn't this called from places where we already own 
the lock?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2888,10 +2888,34 @@ class KafkaApis(val requestChannel: RequestChannel,
                 )
               }
             } else {
-              
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext);
+              // Compute group-specific expiration time for freshly cached 
errors
+              val sessionTimeoutMs = 
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
+                .map(_.streamsSessionTimeoutMs().toLong)
+                
.getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong)
+              val expirationTimeMs = time.milliseconds() + sessionTimeoutMs

Review Comment:
   Two things about this
   
    - I believe we said that we want to use `heartbeatIntervalMs * 2` for the 
timeout, to make sure a client sees the topic creation errors.
    - I would pass an timeoutMs (a duration), not the expirationTimeMs (a 
time). We want to start the timeout from the point we receive the response, not 
here.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2889,9 +2889,31 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             } else {
               
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext);
+              
+              // Check for cached topic creation errors only if there's 
already a MISSING_INTERNAL_TOPICS status
+              val hasMissingInternalTopicsStatus = responseData.status() != 
null && 
+                responseData.status().stream().anyMatch(s => s.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+              
+              if (hasMissingInternalTopicsStatus) {
+                // Calculate group-specific error cache TTL
+                val errorCacheTtlMs = 
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
+                  .map(_.streamsSessionTimeoutMs().toLong)
+                  
.getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong)
+                
+                val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(topicsToCreate.keys.toSet, 
errorCacheTtlMs)
+                if (cachedErrors.nonEmpty) {
+                  val missingInternalTopicStatus =
+                    responseData.status().stream().filter(x => x.statusCode() 
== 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+                  val creationErrorDetails = cachedErrors.map { case (topic, 
error) => s"$topic ($error)" }.mkString(", ")
+                  if (missingInternalTopicStatus.isPresent) {
+                    missingInternalTopicStatus.get().setStatusDetail(
+                      missingInternalTopicStatus.get().statusDetail() + s"; 
Creation failed: $creationErrorDetails."
+                    )

Review Comment:
   Please check if this can happen or handle this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to