Hi Guozhang, Thank you very much for your comments.
Regarding 1: the extra state store is only needed in outer joins since that's the only case we have non-joining records that would need to get emitted when the window closes, right? If we do decide to go with an outer-join implementation, I will make sure to have the extra state store as well. Thank you for pointing it out. Regarding 2: As the self-join is only a physical optimization over an inner join whose two arguments are the same entity, it should return the same results as the inner join. We wouldn't want a user upgrading and enabling the optimization to suddenly see that their joins behave differently and produce different results. As an example, consider the records <A,1> and <A, 2> where A is the key and the number is the value and both are strings. Assume these records are piped into an input topic. And assume we have a self-join (not optimized, so inner join implementation) whose joiner concatenates the values. The output of the join after processing the first record is : <A,11>. The output of the join after processing the second record is: <A,21>, <A,12>, <A,22> So, for an inner join whose two arguments are the same stream, a record does join with itself. And as a user, I would expect the self-join optimization to produce the same results. What do you think? Regarding 3: I did a small experiment and I think the changes I did are backwards compatible. Basically, I created a topology without the optimization, had it process some data and killed it. Then I started it again but with the optimization turned on, and the processing resumed fine as in there was no exception and no extra state stores created and the join results made sense. The optimization is keeping the same state store and doesn't change the names or indices of nodes in the topology. I will however need to add a case for self-joins in the upgrade system tests to make sure that things don't break. Is this sufficient? Regarding the config, one way to go would be to have one config per optimization but I am worried that this will get unwieldy if in the future we have a lot of them and also requires the user to know about the optimizations to be able to benefit from them. Another alternative is to assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all optimizations are applied. If the user doesn't want a specific optimization, then they need to turn that one off. So, we will have a config per optimization but they will be on by default. Best, Vicky On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Vicky, > > Thanks for the KIP! I made a quick pass and here are some quick thoughts: > > 1. Store Implementation: this may be not directly related to the KIP itself > since its all internal, but the stream-stream join state store > implementation has been changed in > https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a > separate store to maintain all the records that have not found a match yet, > and would emit them when time passed for left/outer joins. In this > optimization, I think we can still go with a single store but we need to > make sure we do not regress on KAFKA-10847, i.e. for records not finding a > match, we should also emit them when time passed by, this would likely rely > on the ability to range-over the only store on its "expired" records. A > good reference would be in the recent works to allow emitting final for > windowed aggregations (cc @Hao Li <lihaos...@gmail.com> who can provide > some more references). > > 2. Join Semantics and Outer-Joins: I think we need to clarify for any > single stream record, would itself also be considered a "match" for itself, > OR should we consider only a different record but with the same key and > within the join window a "match" for itself. If it's the former, then I > agree that outer-joins (even left-joins, right?) would not make sense since > we would always find at least a match for any record; if it's the latter, > then outer/left joins still make sense and we would need to consider the > store implementation as stated in 1) above. Personally, I think the latter > is better --- I know it's a bit away from the RDBMS self-join semantics but > for RDBMS self-joins are usually not on PKs, but on FKs so I think its > semantics is less relevant to what we are considering here for windowed > stream-stream joins which are still on PKs. > > 3. Compatibility: first of all, I think we should introduce new values for > the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in addition > to `all` and `none`, this is also what we discussed before to keep > compatibility. But for applications that are already running, we'd also > need to make sure that after a rolling bounce with this config value > changed, we would not break the app. That involves: a) the store names (and > hence the changelog names) should not change -- when we use suffixes, we > should make sure they do not change by burning some suffixes as well, b) > the processor names, similar to store names, c) store formats, if we ever > change the store formats, we need to consider a live upgrade path as well. > > Please let me know your thoughts. > > Guozhang > > > On Tue, Aug 2, 2022 at 11:31 AM Vasiliki Papavasileiou > <vpapavasile...@confluent.io.invalid> wrote: > > > Hello everyone, > > > > I would like to start the discussion for KIP-862: Implement self-join > > optimization > > > > The KIP can be found here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join > > > > Any suggestions are more than welcome. > > > > Many thanks, > > Vicky > > > > > -- > -- Guozhang >