RaidenE1 opened a new pull request, #20325: URL: https://github.com/apache/kafka/pull/20325
# Summary This PR implements error caching for internal topic creation failures in Kafka Streams, allowing errors to be surfaced to users via the Streams group heartbeat status instead of only appearing in broker logs. # Problem Currently, when internal topic creation fails during Streams group heartbeat processing, the error messages are only logged in the broker logs and not exposed to users. As mentioned in the code comments around KafkaApis.scala:2857-2893, the result of the create topic call forwarded to the controller is not awaited, so if an internal topic fails to be created, users cannot see the specific reason for the failure. # Solution 1. Error Caching in AutoTopicCreationManager - Added CachedTopicCreationError case class to store error messages with timestamps - Implemented getTopicCreationErrors() method with: - Lazy cleanup: Expired entries are removed during access based on configurable TTL - Size limits: Cache is limited to 1000 entries, with oldest entries removed when exceeded - TTL based on existing config: Uses 3 × request.timeout.ms (default: 90 seconds) for cache expiration - Enhanced ControllerRequestCompletionHandler.onComplete() to parse CreateTopicsResponse and cache errors for failed topics only - Added proper resource cleanup in close() method 2. Integration with KafkaApis - Enhanced Streams group heartbeat processing in KafkaApis.scala - When MISSING_INTERNAL_TOPICS status is detected, query cached errors and append to status details - Only query cache when Group Coordinator has already reported missing topics 3. Lifecycle Management - Added autoTopicCreationManager.close() call in BrokerServer.shutdown() to ensure proper cleanup # Key Features - Thread-safe: Uses ConcurrentHashMap for concurrent access - Memory efficient: TTL-based expiration and size limits prevent memory leaks - Configurable TTL: Based on existing request.timeout.ms configuration (3× multiplier) - Lazy cleanup: No background threads needed - cleanup happens during normal operation - Selective caching: Only caches actual failures (errorCode != NONE), successful creations are ignored - Comprehensive error handling: Handles authentication failures, version mismatches, and topic-specific errors - Backward compatible: No changes to existing APIs or behavior # Configuration The error cache TTL is automatically calculated as 3 × request.timeout.ms: - Default: 90 seconds (3 × 30s default request timeout) - Configurable: Adjusts automatically when request.timeout.ms is modified - Cache size limit: 1000 entries (hardcoded) # Testing - Added comprehensive unit tests for error caching, TTL cleanup, and size limit management - Added integration test for KafkaApis to verify end-to-end functionality - Updated test cases to use realistic TTL values based on test configuration - All existing tests pass without modification # Code Changes - AutoTopicCreationManager.scala: Added error caching functionality (~70 lines) - KafkaApis.scala: Enhanced Streams heartbeat processing (~15 lines) - BrokerServer.scala: Added cleanup call in shutdown (~2 lines) - Test files: Added comprehensive test coverage (~170 lines) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org