chickenchickenlove opened a new pull request, #20990:
URL: https://github.com/apache/kafka/pull/20990

   ### Background
   
   This PR addresses the limitation where `StreamsBuilder` throws a 
`TopologyException` when multiple source nodes subscribe to overlapping but not 
identical sets of topics.
   
   For example, the following code previously failed:
   ```java
   builder.stream("topicA");
   builder.stream(Arrays.asList("topicA", "topicB"));
   ```
   `Error: Two source nodes are subscribed to overlapping but not equal input 
topics`
   
   ### Changes
   To support this, I have implemented a mechanism to flatten multi-topic 
source nodes into individual single-topic source nodes before the topology 
optimization phase.
   
   - `InternalStreamsBuilder#flattenSourceNodesAndRearrange(...)`
     - Iterates through the root children to find `StreamSourceNodes` 
subscribed to multiple topics.
     - Splits these nodes into multiple single-topic source nodes.
     - Preserves the original `buildPriority` and child nodes (downstream 
processors) for each split node to ensure consistent topology construction.
   - `InternalStreamsBuilder#mergeDuplicateSourceNodes(...)`
     - Removed the check that threw `TopologyException` for overlapping but 
unequal topics.
     - Since nodes are now flattened to single topics first, the existing 
merging logic simply coalesces nodes subscribed to the exact same topic (e.g., 
the "topicA" node from the first stream and the "topicA" node derived from the 
second stream).
   
   ### Note 
   I found the bug or non-determistic behavior. 
   More details are available in here 
(https://issues.apache.org/jira/browse/KAFKA-19923)
   
   That issue has existed independently of this PR and has been present for 
quite some time.
   In addition, once the topology actually starts running, it results in a 
ClassCastException, which immediately terminates the Kafka Streams application. 
Because of this fail-fast behavior, the bug is unlikely to affect any 
real-world, correctly configured Kafka Streams deployments.
   
   While this PR does relax some of the previous constraints, I believe it 
remains highly unlikely for users to subscribe to the same topic with different 
ConsumedInternal configurations. Therefore, I do not expect this change to 
introduce any practical risk.
   
   That said, it may still be helpful to document that all source nodes reading 
from the same topic should use semantically identical ConsumedInternal 
configurations.
   Here, “identical” means that they should not differ in their 
TimestampExtractor or their key and value Serdes.
   
   I hope this clarification is helpful.
   
   ### Result
   - Close : https://issues.apache.org/jira/browse/KAFKA-10721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to