RaidenE1 opened a new pull request, #20325:
URL: https://github.com/apache/kafka/pull/20325

   # Summary
   
     This PR implements error caching for internal topic creation failures in 
Kafka Streams, allowing errors to be
     surfaced to users via the Streams group heartbeat status instead of only 
appearing in broker logs.
   
   # Problem
   
     Currently, when internal topic creation fails during Streams group 
heartbeat processing, the error messages are
     only logged in the broker logs and not exposed to users. As mentioned in 
the code comments around
     KafkaApis.scala:2857-2893, the result of the create topic call forwarded 
to the controller is not awaited, so if
      an internal topic fails to be created, users cannot see the specific 
reason for the failure.
   
   # Solution
   
     1. Error Caching in AutoTopicCreationManager
   
     - Added CachedTopicCreationError case class to store error messages with 
timestamps
     - Implemented getTopicCreationErrors() method with:
       - Lazy cleanup: Expired entries are removed during access based on 
configurable TTL
       - Size limits: Cache is limited to 1000 entries, with oldest entries 
removed when exceeded
       - TTL based on existing config: Uses 3 × request.timeout.ms (default: 90 
seconds) for cache expiration
     - Enhanced ControllerRequestCompletionHandler.onComplete() to parse 
CreateTopicsResponse and cache errors for
     failed topics only
     - Added proper resource cleanup in close() method
   
     2. Integration with KafkaApis
   
     - Enhanced Streams group heartbeat processing in KafkaApis.scala
     - When MISSING_INTERNAL_TOPICS status is detected, query cached errors and 
append to status details
     - Only query cache when Group Coordinator has already reported missing 
topics
   
     3. Lifecycle Management
   
     - Added autoTopicCreationManager.close() call in BrokerServer.shutdown() 
to ensure proper cleanup
   
   # Key Features
   
     - Thread-safe: Uses ConcurrentHashMap for concurrent access
     - Memory efficient: TTL-based expiration and size limits prevent memory 
leaks
     - Configurable TTL: Based on existing request.timeout.ms configuration (3× 
multiplier)
     - Lazy cleanup: No background threads needed - cleanup happens during 
normal operation
     - Selective caching: Only caches actual failures (errorCode != NONE), 
successful creations are ignored
     - Comprehensive error handling: Handles authentication failures, version 
mismatches, and topic-specific errors
     - Backward compatible: No changes to existing APIs or behavior
   
   # Configuration
   
     The error cache TTL is automatically calculated as 3 × request.timeout.ms:
     - Default: 90 seconds (3 × 30s default request timeout)
     - Configurable: Adjusts automatically when request.timeout.ms is modified
     - Cache size limit: 1000 entries (hardcoded)
   
   # Testing
   
     - Added comprehensive unit tests for error caching, TTL cleanup, and size 
limit management
     - Added integration test for KafkaApis to verify end-to-end functionality
     - Updated test cases to use realistic TTL values based on test 
configuration
     - All existing tests pass without modification
   
   # Code Changes
   
     - AutoTopicCreationManager.scala: Added error caching functionality (~70 
lines)
     - KafkaApis.scala: Enhanced Streams heartbeat processing (~15 lines)
     - BrokerServer.scala: Added cleanup call in shutdown (~2 lines)
     - Test files: Added comprehensive test coverage (~170 lines)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to