aliehsaeedii commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2273419281


##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -393,4 +403,119 @@ class AutoTopicCreationManagerTest {
       .setNumPartitions(numPartitions)
       .setReplicationFactor(replicationFactor)
   }
+
+  @Test
+  def testTopicCreationErrorCaching(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate a CreateTopicsResponse with errors
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic-1")
+      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+      .setErrorMessage("Topic 'test-topic-1' already exists.")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    // Trigger the completion handler
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Verify that the error was cached
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"))
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("test-topic-1"))
+    assertEquals("Topic 'test-topic-1' already exists.", 
cachedErrors("test-topic-1"))
+  }
+
+  @Test
+  def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+
+    val topics = Map(
+      "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+      "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate mixed response - one success, one failure
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("success-topic")
+        .setErrorCode(Errors.NONE.code())
+    )
+    createTopicsResponseData.topics().add(
+      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+        .setName("failed-topic")
+        .setErrorCode(Errors.POLICY_VIOLATION.code())
+        .setErrorMessage("Policy violation")
+    )
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+    // Only the failed topic should be cached
+    val cachedErrors = 
autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", 
"failed-topic", "nonexistent-topic"))
+    assertEquals(1, cachedErrors.size)
+    assertTrue(cachedErrors.contains("failed-topic"))
+    assertEquals("Policy violation", cachedErrors("failed-topic"))
+  }
+
+  @Test 
+  def testErrorCacheTTL(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime)
+    
+    // 测试getTopicCreationErrors在没有缓存时返回空
+    val initialResult = 
autoTopicCreationManager.getTopicCreationErrors(Set("nonexistent-topic"))
+    assertTrue(initialResult.isEmpty)
+    
+    // 让时间前进,再次查询确保仍然为空

Review Comment:
   Wrong chars! The same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to