Matthias J. Sax created KAFKA-18692: ---------------------------------------
Summary: Consider to unify KStreamImpl "repartitionRequired" with GraphNode "keyChangingOperation" Key: KAFKA-18692 URL: https://issues.apache.org/jira/browse/KAFKA-18692 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax In `KStreamImpl`, we use a flag `repartitionRequired` which allows us to track if an operator (or one of its transitive ancestors) did modify the key, so if a key-dependent operation like aggregation or join is executed, we know if we need to insert an auto-repartition step. In parallel, we also track key-changing operations in our internal "topology graph" representation (which is use by our topology optimization layer), via GraphNode#keyChangingOperation(). Thus, we have two independent code path which do a similar thing (note, both are semantically not exactly the same thing, so we need to be careful to get this right; more details below). To avoid subtle bugs, it might be worth to refactor the code, and to unify both. The high level idea would be (without me looking into details) to remove `KStreamImpl#repartitionRequired` flag all together, as we already pass in a `GraphNode` into `KStreamImpl` and thus can access `isKeyChangingOperation()`. However, `isKeyChangingOperation()` only tracks if the _current_ node is key-changing, and if it returns `false` we don't know anything about it's ancestors. Thus, semantics are different to `repartitionRequired` flag, which already considers ancestor information. Hence, we might need to traverse the `GraphNode` structure backwards, to verify if a parent did change the key or not (cf `InternalStreamsBuilder#getKeyChangingParentNode()` that I believe we could re-use). Another thing to consider is, that some operators like `repartition()` explicit reset `repartitionRequired=false` in a hard-coded way. However, `keyChangingOperation=false` does not carry this information – we don't know if the current operator just does not touch the key, or if the current operator _ensures_ that we are partitioned by the current key. Ie, we need a new way to track this information on `GraphNode` to have a way to stop/break the backward traversal if we hit such a node which does this "reset". Overall, I believe it would be good to do this rewrite (based on an educated guess...), but it might also turn out (after some POC coding) that it's not a good idea and it's not worth to do as could add more problems as it solves, and we might also just need to close this ticket as "won't fix". Only a PR could tell for sure. -- This message was sent by Atlassian Jira (v8.20.10#820010)