[ https://issues.apache.org/jira/browse/KAFKA-18692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923100#comment-17923100 ]
appchemist edited comment on KAFKA-18692 at 2/2/25 2:38 PM: ------------------------------------------------------------ hi [~mjsax] I don't have a full grasp of the Streams code yet. but if you don't mind, can I try that POC with your feedback? was (Author: appchemist): I don't have a full grasp of the Streams code yet. but if you don't mind, can I try that POC with your feedback? > Consider to unify KStreamImpl "repartitionRequired" with GraphNode > "keyChangingOperation" > ----------------------------------------------------------------------------------------- > > Key: KAFKA-18692 > URL: https://issues.apache.org/jira/browse/KAFKA-18692 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > > In `KStreamImpl`, we use a flag `repartitionRequired` which allows us to > track if an operator (or one of its transitive ancestors) did modify the key, > so if a key-dependent operation like aggregation or join is executed, we know > if we need to insert an auto-repartition step. > In parallel, we also track key-changing operations in our internal "topology > graph" representation (which is use by our topology optimization layer), via > GraphNode#keyChangingOperation(). > Thus, we have two independent code path which do a similar thing (note, both > are semantically not exactly the same thing, so we need to be careful to get > this right; more details below). To avoid subtle bugs, it might be worth to > refactor the code, and to unify both. > The high level idea would be (without me looking into details) to remove > `KStreamImpl#repartitionRequired` flag all together, as we already pass in a > `GraphNode` into `KStreamImpl` and thus can access > `isKeyChangingOperation()`. However, `isKeyChangingOperation()` only tracks > if the _current_ node is key-changing, and if it returns `false` we don't > know anything about it's ancestors. Thus, semantics are different to > `repartitionRequired` flag, which already considers ancestor information. > Hence, we might need to traverse the `GraphNode` structure backwards, to > verify if a parent did change the key or not (cf > `InternalStreamsBuilder#getKeyChangingParentNode()` that I believe we could > re-use). > Another thing to consider is, that some operators like `repartition()` > explicit reset `repartitionRequired=false` in a hard-coded way. However, > `keyChangingOperation=false` does not carry this information – we don't know > if the current operator just does not touch the key, or if the current > operator _ensures_ that we are partitioned by the current key. Ie, we need a > new way to track this information on `GraphNode` to have a way to stop/break > the backward traversal if we hit such a node which does this "reset". > Overall, I believe it would be good to do this rewrite (based on an educated > guess...), but it might also turn out (after some POC coding) that it's not a > good idea and it's not worth to do as could add more problems as it solves, > and we might also just need to close this ticket as "won't fix". Only a PR > could tell for sure. -- This message was sent by Atlassian Jira (v8.20.10#820010)