chickenchickenlove opened a new pull request, #20990:
URL: https://github.com/apache/kafka/pull/20990
### Background
This PR addresses the limitation where `StreamsBuilder` throws a
`TopologyException` when multiple source nodes subscribe to overlapping but not
identical sets of topics.
For example, the following code previously failed:
```java
builder.stream("topicA");
builder.stream(Arrays.asList("topicA", "topicB"));
```
`Error: Two source nodes are subscribed to overlapping but not equal input
topics`
### Changes
To support this, I have implemented a mechanism to flatten multi-topic
source nodes into individual single-topic source nodes before the topology
optimization phase.
- `InternalStreamsBuilder#flattenSourceNodesAndRearrange(...)`
- Iterates through the root children to find `StreamSourceNodes`
subscribed to multiple topics.
- Splits these nodes into multiple single-topic source nodes.
- Preserves the original `buildPriority` and child nodes (downstream
processors) for each split node to ensure consistent topology construction.
- `InternalStreamsBuilder#mergeDuplicateSourceNodes(...)`
- Removed the check that threw `TopologyException` for overlapping but
unequal topics.
- Since nodes are now flattened to single topics first, the existing
merging logic simply coalesces nodes subscribed to the exact same topic (e.g.,
the "topicA" node from the first stream and the "topicA" node derived from the
second stream).
### Note
I found the bug or non-determistic behavior.
More details are available in here
(https://issues.apache.org/jira/browse/KAFKA-19923)
That issue has existed independently of this PR and has been present for
quite some time.
In addition, once the topology actually starts running, it results in a
ClassCastException, which immediately terminates the Kafka Streams application.
Because of this fail-fast behavior, the bug is unlikely to affect any
real-world, correctly configured Kafka Streams deployments.
While this PR does relax some of the previous constraints, I believe it
remains highly unlikely for users to subscribe to the same topic with different
ConsumedInternal configurations. Therefore, I do not expect this change to
introduce any practical risk.
That said, it may still be helpful to document that all source nodes reading
from the same topic should use semantically identical ConsumedInternal
configurations.
Here, “identical” means that they should not differ in their
TimestampExtractor or their key and value Serdes.
I hope this clarification is helpful.
### Result
- Close : https://issues.apache.org/jira/browse/KAFKA-10721
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]