Thanks for the KIP, Vicky! Re 1/2, I agree with what you both worked out.
Re 3: It sounds like you were able to preserve backward compatibility, so I don’t think you need to add any new configs. I think you can just switch it on if people specify “all”. Thanks! -John On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote: > Thanks Vicky for your reply! > > Re 1/2): I think you have a great point here to adhere with the existing > implementation, I'm convinced. In that case we do not need to consider > left/outer-joins, and hence do not need to worry about the extra store in > the impl. > > Re 3): I'm curious how the compatibility is preserved since with > optimizations turned on, we would use fewer stores and hence the store name > suffixes would change. In your experiment did you specifically specify the > store names, e.g. via Materialized? I'd be glad if it turns out to really > be conveniently backward compatible, and rest with my concerns :) > > > Guozhang > > On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou > <vpapavasile...@confluent.io.invalid> wrote: > >> Hi Guozhang, >> >> Thank you very much for your comments. >> >> Regarding 1: the extra state store is only needed in outer joins since >> that's the only case we have non-joining records that would need to get >> emitted when the window closes, right? If we do decide to go with an >> outer-join implementation, I will make sure to have the extra state store >> as well. Thank you for pointing it out. >> >> Regarding 2: As the self-join is only a physical optimization over an inner >> join whose two arguments are the same entity, it should return the same >> results as the inner join. We wouldn't want a user upgrading and enabling >> the optimization to suddenly see that their joins behave differently and >> produce different results. >> As an example, consider the records <A,1> and <A, 2> where A is the key and >> the number is the value and both are strings. Assume these records are >> piped into an input topic. And assume we have a self-join (not optimized, >> so inner join implementation) whose joiner concatenates the values. >> The output of the join after processing the first record is : <A,11>. >> The output of the join after processing the second record is: <A,21>, >> <A,12>, <A,22> >> So, for an inner join whose two arguments are the same stream, a record >> does join with itself. And as a user, I would expect the self-join >> optimization to produce the same results. What do you think? >> >> Regarding 3: I did a small experiment and I think the changes I did are >> backwards compatible. Basically, I created a topology without the >> optimization, had it process some data and killed it. Then I started it >> again but with the optimization turned on, and the processing resumed fine >> as in there was no exception and no extra state stores created and the join >> results made sense. The optimization is keeping the same state store and >> doesn't change the names or indices of nodes in the topology. I will >> however need to add a case for self-joins in the upgrade system tests to >> make sure that things don't break. Is this sufficient? >> Regarding the config, one way to go would be to have one config per >> optimization but I am worried that this will get unwieldy if in the future >> we have a lot of them and also requires the user to know about the >> optimizations to be able to benefit from them. Another alternative is to >> assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all >> optimizations are applied. If the user doesn't want a specific >> optimization, then they need to turn that one off. So, we will have a >> config per optimization but they will be on by default. >> >> Best, >> Vicky >> >> On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Hello Vicky, >> > >> > Thanks for the KIP! I made a quick pass and here are some quick thoughts: >> > >> > 1. Store Implementation: this may be not directly related to the KIP >> itself >> > since its all internal, but the stream-stream join state store >> > implementation has been changed in >> > https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a >> > separate store to maintain all the records that have not found a match >> yet, >> > and would emit them when time passed for left/outer joins. In this >> > optimization, I think we can still go with a single store but we need to >> > make sure we do not regress on KAFKA-10847, i.e. for records not finding >> a >> > match, we should also emit them when time passed by, this would likely >> rely >> > on the ability to range-over the only store on its "expired" records. A >> > good reference would be in the recent works to allow emitting final for >> > windowed aggregations (cc @Hao Li <lihaos...@gmail.com> who can provide >> > some more references). >> > >> > 2. Join Semantics and Outer-Joins: I think we need to clarify for any >> > single stream record, would itself also be considered a "match" for >> itself, >> > OR should we consider only a different record but with the same key and >> > within the join window a "match" for itself. If it's the former, then I >> > agree that outer-joins (even left-joins, right?) would not make sense >> since >> > we would always find at least a match for any record; if it's the latter, >> > then outer/left joins still make sense and we would need to consider the >> > store implementation as stated in 1) above. Personally, I think the >> latter >> > is better --- I know it's a bit away from the RDBMS self-join semantics >> but >> > for RDBMS self-joins are usually not on PKs, but on FKs so I think its >> > semantics is less relevant to what we are considering here for windowed >> > stream-stream joins which are still on PKs. >> > >> > 3. Compatibility: first of all, I think we should introduce new values >> for >> > the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in >> addition >> > to `all` and `none`, this is also what we discussed before to keep >> > compatibility. But for applications that are already running, we'd also >> > need to make sure that after a rolling bounce with this config value >> > changed, we would not break the app. That involves: a) the store names >> (and >> > hence the changelog names) should not change -- when we use suffixes, we >> > should make sure they do not change by burning some suffixes as well, b) >> > the processor names, similar to store names, c) store formats, if we ever >> > change the store formats, we need to consider a live upgrade path as >> well. >> > >> > Please let me know your thoughts. >> > >> > Guozhang >> > >> > >> > On Tue, Aug 2, 2022 at 11:31 AM Vasiliki Papavasileiou >> > <vpapavasile...@confluent.io.invalid> wrote: >> > >> > > Hello everyone, >> > > >> > > I would like to start the discussion for KIP-862: Implement self-join >> > > optimization >> > > >> > > The KIP can be found here: >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join >> > > >> > > Any suggestions are more than welcome. >> > > >> > > Many thanks, >> > > Vicky >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > -- > -- Guozhang