aliehsaeedii commented on code in PR #20325: URL: https://github.com/apache/kafka/pull/20325#discussion_r2273417888
########## core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala: ########## @@ -393,4 +403,119 @@ class AutoTopicCreationManagerTest { .setNumPartitions(numPartitions) .setReplicationFactor(replicationFactor) } + + @Test + def testTopicCreationErrorCaching(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator, + mockTime) + + val topics = Map( + "test-topic-1" -> new CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate a CreateTopicsResponse with errors + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("test-topic-1") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'test-topic-1' already exists.") + createTopicsResponseData.topics().add(topicResult) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + // Trigger the completion handler + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + // Verify that the error was cached + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1")) + assertEquals(1, cachedErrors.size) + assertTrue(cachedErrors.contains("test-topic-1")) + assertEquals("Topic 'test-topic-1' already exists.", cachedErrors("test-topic-1")) + } + + @Test + def testGetTopicCreationErrorsWithMultipleTopics(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator, + mockTime) + + val topics = Map( + "success-topic" -> new CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1), + "failed-topic" -> new CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate mixed response - one success, one failure + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + createTopicsResponseData.topics().add( + new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("success-topic") + .setErrorCode(Errors.NONE.code()) + ) + createTopicsResponseData.topics().add( + new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("failed-topic") + .setErrorCode(Errors.POLICY_VIOLATION.code()) + .setErrorMessage("Policy violation") + ) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + // Only the failed topic should be cached + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", "failed-topic", "nonexistent-topic")) + assertEquals(1, cachedErrors.size) + assertTrue(cachedErrors.contains("failed-topic")) + assertEquals("Policy violation", cachedErrors("failed-topic")) + } + + @Test + def testErrorCacheTTL(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator, + mockTime) + + // 测试getTopicCreationErrors在没有缓存时返回空 Review Comment: ?! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org