Matthias J. Sax created KAFKA-18692:
---------------------------------------

             Summary: Consider to unify KStreamImpl "repartitionRequired" with 
GraphNode "keyChangingOperation"
                 Key: KAFKA-18692
                 URL: https://issues.apache.org/jira/browse/KAFKA-18692
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Matthias J. Sax


In `KStreamImpl`, we use a flag `repartitionRequired` which allows us to track 
if an operator (or one of its transitive ancestors) did modify the key, so if a 
key-dependent operation like aggregation or join is executed, we know if we 
need to insert an auto-repartition step.

In parallel, we also track key-changing operations in our internal "topology 
graph" representation (which is use by our topology optimization layer), via 
GraphNode#keyChangingOperation().

Thus, we have two independent code path which do a similar thing (note, both 
are semantically not exactly the same thing, so we need to be careful to get 
this right; more details below). To avoid subtle bugs, it might be worth to 
refactor the code, and to unify both.

The high level idea would be (without me looking into details) to remove 
`KStreamImpl#repartitionRequired` flag all together, as we already pass in a 
`GraphNode` into `KStreamImpl` and thus can access `isKeyChangingOperation()`. 
However, `isKeyChangingOperation()` only tracks if the _current_ node is 
key-changing, and if it returns `false` we don't know anything about it's 
ancestors. Thus, semantics are different to `repartitionRequired` flag, which 
already considers ancestor information. Hence, we might need to traverse the 
`GraphNode` structure backwards, to verify if a parent did change the key or 
not (cf `InternalStreamsBuilder#getKeyChangingParentNode()` that I believe we 
could re-use).

Another thing to consider is, that some operators like `repartition()` explicit 
reset `repartitionRequired=false` in a hard-coded way. However, 
`keyChangingOperation=false` does not carry this information – we don't know if 
the current operator just does not touch the key, or if the current operator 
_ensures_ that we are partitioned by the current key. Ie, we need a new way to 
track this information on `GraphNode` to have a way to stop/break the backward 
traversal if we hit such a node which does this "reset".

Overall, I believe it would be good to do this rewrite (based on an educated 
guess...), but it might also turn out (after some POC coding) that it's not a 
good idea and it's not worth to do as could add more problems as it solves, and 
we might also just need to close this ticket as "won't fix". Only a PR could 
tell for sure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to