mjsax commented on code in PR #20325: URL: https://github.com/apache/kafka/pull/20325#discussion_r2268282851
########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -64,6 +77,9 @@ class DefaultAutoTopicCreationManager( ) extends AutoTopicCreationManager with Logging { private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]() + private val errorCacheTtlMs = config.requestTimeoutMs.toLong * 3 // 3x request timeout Review Comment: Not sure if we should couple this to request timeout? -- Might be better to couple it to session timeout? If a client does not heartbeat within session timeout, we would remove it from the group. Side question: would we need to track error per streams group, and use a group specific ttl, given that each group could set an individual session timeout? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager( } } + override def getTopicCreationErrors( + topicNames: Set[String] + ): Map[String, String] = { + val currentTime = System.currentTimeMillis() + val errors = mutable.Map.empty[String, String] + val expiredKeys = mutable.Set.empty[String] + + // Check requested topics and collect expired keys + topicNames.foreach { topicName => + Option(topicCreationErrorCache.get(topicName)) match { + case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs => Review Comment: Why would we exclude the error message if we still have it? -- I thought the ttl would apply for the case, that we never returned an error, and want to drop it on the floor, via some cleanup process? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -135,10 +190,17 @@ class DefaultAutoTopicCreationManager( clearInflightRequests(creatableTopics) if (response.authenticationException() != null) { warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception") + cacheTopicCreationErrors(creatableTopics.keys.toSet, "Authentication failed") Review Comment: Should we use the error message from `response.authenticationException()` instead of a manually created error message? Also wondering, why we don't log the actual exception error message in the `warn` above? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -263,4 +325,31 @@ class DefaultAutoTopicCreationManager( (creatableTopics, uncreatableTopics) } + + private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: String): Unit = { + val timestamp = System.currentTimeMillis() + topicNames.foreach { topicName => + topicCreationErrorCache.put(topicName, CachedTopicCreationError(errorMessage, timestamp)) + } + } + + private def cacheTopicCreationErrorsFromResponse(response: CreateTopicsResponse): Unit = { + val timestamp = System.currentTimeMillis() Review Comment: One more. ########## core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala: ########## @@ -393,4 +393,176 @@ class AutoTopicCreationManagerTest { .setNumPartitions(numPartitions) .setReplicationFactor(replicationFactor) } + + @Test + def testTopicCreationErrorCaching(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator) + + val topics = Map( + "test-topic-1" -> new CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate a CreateTopicsResponse with errors + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("test-topic-1") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'test-topic-1' already exists.") + createTopicsResponseData.topics().add(topicResult) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + // Trigger the completion handler + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + // Verify that the error was cached + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1")) + assertEquals(1, cachedErrors.size) + assertTrue(cachedErrors.contains("test-topic-1")) + assertEquals("Topic 'test-topic-1' already exists.", cachedErrors("test-topic-1")) + } + + @Test + def testGetTopicCreationErrorsWithMultipleTopics(): Unit = { + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator) + + val topics = Map( + "success-topic" -> new CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1), + "failed-topic" -> new CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrincipal() + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Simulate mixed response - one success, one failure + val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData() + createTopicsResponseData.topics().add( + new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("success-topic") + .setErrorCode(Errors.NONE.code()) + ) + createTopicsResponseData.topics().add( + new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult() + .setName("failed-topic") + .setErrorCode(Errors.POLICY_VIOLATION.code()) + .setErrorMessage("Policy violation") + ) + + val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData) + val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, createTopicsResponse) + + argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse) + + // Only the failed topic should be cached + val cachedErrors = autoTopicCreationManager.getTopicCreationErrors(Set("success-topic", "failed-topic", "nonexistent-topic")) + assertEquals(1, cachedErrors.size) + assertTrue(cachedErrors.contains("failed-topic")) + assertEquals("Policy violation", cachedErrors("failed-topic")) + } + + @Test + def testLazyCleanupOfExpiredCacheEntries(): Unit = { + // Test will verify that expired entries are cleaned up when accessed + // We'll simulate the passage of time by directly manipulating the cache + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + brokerToController, + groupCoordinator, + transactionCoordinator, + shareCoordinator) + + // Manually add an expired entry to the cache using reflection Review Comment: Seems you need to use reflection as you cannot modify the time? If we use `MockTime` in the test, we should be able to avoid reflection. ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -64,6 +77,9 @@ class DefaultAutoTopicCreationManager( ) extends AutoTopicCreationManager with Logging { private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + private val topicCreationErrorCache = new ConcurrentHashMap[String, CachedTopicCreationError]() + private val errorCacheTtlMs = config.requestTimeoutMs.toLong * 3 // 3x request timeout + private val maxCacheSize = 1000 Review Comment: Wondering why we would need to bound the cache size? What is the reasoning for this? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager( } } + override def getTopicCreationErrors( + topicNames: Set[String] + ): Map[String, String] = { + val currentTime = System.currentTimeMillis() + val errors = mutable.Map.empty[String, String] + val expiredKeys = mutable.Set.empty[String] + + // Check requested topics and collect expired keys + topicNames.foreach { topicName => + Option(topicCreationErrorCache.get(topicName)) match { + case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs => + errors.put(topicName, cachedError.errorMessage) + case Some(_) => + expiredKeys += topicName Review Comment: Not sure if I understand this logic? I though we would expire an entry, if we never returned it to the client, and if TTL passed? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager( } } + override def getTopicCreationErrors( + topicNames: Set[String] + ): Map[String, String] = { + val currentTime = System.currentTimeMillis() Review Comment: We should never call `System.currentTimeMillis()` directly, but use a `Time` object -- as the `AutoTopicCreationManager` does not have one yet, we need to add it to the constructor and make available, and trace back where `AutoTopicCreationManager` is setup to find the `Time` object there. Using `Time` interface allows us to mock time in unit tests. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10949,6 +10950,59 @@ class KafkaApisTest extends Logging { ) } + @Test + def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = { Review Comment: It's a little unclear to me, what this method is supposed to actually verify? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -135,10 +190,17 @@ class DefaultAutoTopicCreationManager( clearInflightRequests(creatableTopics) if (response.authenticationException() != null) { warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception") + cacheTopicCreationErrors(creatableTopics.keys.toSet, "Authentication failed") } else if (response.versionMismatch() != null) { warn(s"Auto topic creation failed for ${creatableTopics.keys} with invalid version exception") + cacheTopicCreationErrors(creatableTopics.keys.toSet, "Version mismatch") Review Comment: As above ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -112,6 +128,45 @@ class DefaultAutoTopicCreationManager( } } + override def getTopicCreationErrors( + topicNames: Set[String] + ): Map[String, String] = { + val currentTime = System.currentTimeMillis() + val errors = mutable.Map.empty[String, String] + val expiredKeys = mutable.Set.empty[String] + + // Check requested topics and collect expired keys + topicNames.foreach { topicName => + Option(topicCreationErrorCache.get(topicName)) match { + case Some(cachedError) if (currentTime - cachedError.timestamp) <= errorCacheTtlMs => + errors.put(topicName, cachedError.errorMessage) + case Some(_) => Review Comment: If we are using `Some(cacheError)` above, would this case actually every be executed (I am not a Scala person, but my understanding is, that `Same(cacheError)` would be a "catch all"? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -263,4 +325,31 @@ class DefaultAutoTopicCreationManager( (creatableTopics, uncreatableTopics) } + + private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: String): Unit = { + val timestamp = System.currentTimeMillis() Review Comment: As above ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -263,4 +325,31 @@ class DefaultAutoTopicCreationManager( (creatableTopics, uncreatableTopics) } + + private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: String): Unit = { + val timestamp = System.currentTimeMillis() + topicNames.foreach { topicName => + topicCreationErrorCache.put(topicName, CachedTopicCreationError(errorMessage, timestamp)) Review Comment: Should we change `CachedTopicCreationError` constructor to set timestamp automatically, instead of passing in it? If we just get current ms ts anyway, we can move the code into the constructor. Or would we ever need to pass in something different? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2889,6 +2889,24 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext); + + // Check for cached topic creation errors only if there's already a MISSING_INTERNAL_TOPICS status + val hasMissingInternalTopicsStatus = responseData.status() != null && + responseData.status().stream().anyMatch(s => s.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) Review Comment: Not sure if I can follow? Why would we need to check `MISSING_INTERNAL_TOPICS` status? ########## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ########## @@ -135,10 +190,17 @@ class DefaultAutoTopicCreationManager( clearInflightRequests(creatableTopics) if (response.authenticationException() != null) { warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception") + cacheTopicCreationErrors(creatableTopics.keys.toSet, "Authentication failed") } else if (response.versionMismatch() != null) { warn(s"Auto topic creation failed for ${creatableTopics.keys} with invalid version exception") + cacheTopicCreationErrors(creatableTopics.keys.toSet, "Version mismatch") } else { - debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.") + response.responseBody() match { + case createTopicsResponse: CreateTopicsResponse => + cacheTopicCreationErrorsFromResponse(createTopicsResponse) Review Comment: I thought, for this `else` branch, the request was successful and no error would be returned? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10949,6 +10950,59 @@ class KafkaApisTest extends Logging { ) } + @Test + def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(util.Map.of(StreamsVersion.FEATURE_NAME, 1.toShort)) + + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) + + val future = new CompletableFuture[StreamsGroupHeartbeatResult]() + when(groupCoordinator.streamsGroupHeartbeat( + requestChannelRequest.context, + streamsGroupHeartbeatRequest + )).thenReturn(future) + + // Mock AutoTopicCreationManager to return cached errors + val mockAutoTopicCreationManager = mock(classOf[AutoTopicCreationManager]) + when(mockAutoTopicCreationManager.getTopicCreationErrors(Set("test-topic"))) + .thenReturn(Map("test-topic" -> "INVALID_REPLICATION_FACTOR")) + + kafkaApis = createKafkaApis(autoTopicCreationManager = Some(mockAutoTopicCreationManager)) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + // Group coordinator returns MISSING_INTERNAL_TOPICS status and topics to create + val missingTopics = util.Map.of("test-topic", new CreatableTopic()) + val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() + .setMemberId("member") + .setStatus(util.List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("Internal topics are missing: [test-topic]") + )) + + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics)) + val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) + + assertEquals(Errors.NONE.code, response.data.errorCode()) + assertEquals(null, response.data.errorMessage()) + + // Verify that the cached error was appended to the existing status detail Review Comment: Are we really verifying this? It seem our test code, does assemble the `StreamsGroupHeartbeatResponseData`, so we don't really execute prod code? So are we only verifying that our test code does setup the right response? For this case, it seems the test would not actually test anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org