[ 
https://issues.apache.org/jira/browse/KAFKA-18692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923100#comment-17923100
 ] 

appchemist edited comment on KAFKA-18692 at 2/2/25 2:38 PM:
------------------------------------------------------------

hi [~mjsax] 

I don't have a full grasp of the Streams code yet.
but if you don't mind, can I try that POC with your feedback?


was (Author: appchemist):
I don't have a full grasp of the Streams code yet.
but if you don't mind, can I try that POC with your feedback?

> Consider to unify KStreamImpl "repartitionRequired" with GraphNode 
> "keyChangingOperation"
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-18692
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18692
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>
> In `KStreamImpl`, we use a flag `repartitionRequired` which allows us to 
> track if an operator (or one of its transitive ancestors) did modify the key, 
> so if a key-dependent operation like aggregation or join is executed, we know 
> if we need to insert an auto-repartition step.
> In parallel, we also track key-changing operations in our internal "topology 
> graph" representation (which is use by our topology optimization layer), via 
> GraphNode#keyChangingOperation().
> Thus, we have two independent code path which do a similar thing (note, both 
> are semantically not exactly the same thing, so we need to be careful to get 
> this right; more details below). To avoid subtle bugs, it might be worth to 
> refactor the code, and to unify both.
> The high level idea would be (without me looking into details) to remove 
> `KStreamImpl#repartitionRequired` flag all together, as we already pass in a 
> `GraphNode` into `KStreamImpl` and thus can access 
> `isKeyChangingOperation()`. However, `isKeyChangingOperation()` only tracks 
> if the _current_ node is key-changing, and if it returns `false` we don't 
> know anything about it's ancestors. Thus, semantics are different to 
> `repartitionRequired` flag, which already considers ancestor information. 
> Hence, we might need to traverse the `GraphNode` structure backwards, to 
> verify if a parent did change the key or not (cf 
> `InternalStreamsBuilder#getKeyChangingParentNode()` that I believe we could 
> re-use).
> Another thing to consider is, that some operators like `repartition()` 
> explicit reset `repartitionRequired=false` in a hard-coded way. However, 
> `keyChangingOperation=false` does not carry this information – we don't know 
> if the current operator just does not touch the key, or if the current 
> operator _ensures_ that we are partitioned by the current key. Ie, we need a 
> new way to track this information on `GraphNode` to have a way to stop/break 
> the backward traversal if we hit such a node which does this "reset".
> Overall, I believe it would be good to do this rewrite (based on an educated 
> guess...), but it might also turn out (after some POC coding) that it's not a 
> good idea and it's not worth to do as could add more problems as it solves, 
> and we might also just need to close this ticket as "won't fix". Only a PR 
> could tell for sure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to