Thanks Vicky, I do not have any further comments about the KIP.
Guozhang On Tue, Aug 30, 2022 at 8:21 AM Vasiliki Papavasileiou <vpapavasile...@confluent.io.invalid> wrote: > Hi Guozhang, > > That's an excellent idea, I will make the changes. I was also going back > and forth with having a specific config for each optimization or not but I > feel your approach has the best of both worlds. > > Thank you, > Vicky > > On Sun, Aug 28, 2022 at 6:20 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Vicky, > > > > I made a quick pass on your WIP PR and now I understand and agree that > > compatibility is indeed preserved since we get the optimized topology in > a > > second pass, and hence we already "used and burned" the original > topologies > > naming suffices in the first pass. > > > > Regarding the configuration patterns, I still have a bit concern about > it: > > primarily, if we follow this pattern to introduce a new config for each > > optimization rule, in the future we would have a lot of configs --- one > per > > rule --- inside the StreamsConfig. I thought about this back and forth > > again and still feel that this may not be what we want.. I think stead, > we > > can change the existing `TOPOLOGY_OPTIMIZATION_CONFIG` to accept a list > of > > strings, separated by comma --- this aligns with other similar configs as > > well --- so that for different scenarios users can choose either fine > > grained or coarse grained controls, e.g.: > > > > * I just want to enable all rules, or none: "all", "none". > > * I know my app was created with Kafka version X, and I just want to only > > apply all rules that are already there since version X: "versionX" --- I > > just made it up for future use cases since we discussed about it in the > > original KIP when we introduced "TOPOLOGY_OPTIMIZATION_CONFIG", we do not > > need to include it in this KIP. > > * I know my app is compatible with specific rules A/B/C, and I just want > to > > always enable those and not others: "ruleA,ruleB,ruleC". > > > > SO far we only have a few rules: a) reuse source topic as changelog topic > > for KTable, b) merge duplicate repartition topics, c) self-join (this > KIP), > > so I suggest in this KIP, we just add make the > > `TOPOLOGY_OPTIMIZATION_CONFIG` accepting a list of string, but 1) check > > that some strings cannot coexist (e.g. `none` and all`), and 2) add a new > > string value for self-join itself. In this way: > > > > * People who chose `none` before will not be impacted. > > * People who chose `all` before will get this optimization by default, > and > > it's backward compatible so it's okay; they also get what they meant: I > > just want "all" :) > > * Advanced users who read about this KIP and just what it but not others: > > they will change their config from `none` to `self-join`. > > > > WDYT? > > > > > > Guozhang > > > > > > > > > > On Fri, Aug 12, 2022 at 7:25 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks for the KIP, Vicky! > > > > > > Re 1/2, I agree with what you both worked out. > > > > > > Re 3: It sounds like you were able to preserve backward compatibility, > so > > > I don’t think you need to add any new configs. I think you can just > > switch > > > it on if people specify “all”. > > > > > > Thanks! > > > -John > > > > > > > > > On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote: > > > > Thanks Vicky for your reply! > > > > > > > > Re 1/2): I think you have a great point here to adhere with the > > existing > > > > implementation, I'm convinced. In that case we do not need to > consider > > > > left/outer-joins, and hence do not need to worry about the extra > store > > in > > > > the impl. > > > > > > > > Re 3): I'm curious how the compatibility is preserved since with > > > > optimizations turned on, we would use fewer stores and hence the > store > > > name > > > > suffixes would change. In your experiment did you specifically > specify > > > the > > > > store names, e.g. via Materialized? I'd be glad if it turns out to > > really > > > > be conveniently backward compatible, and rest with my concerns :) > > > > > > > > > > > > Guozhang > > > > > > > > On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou > > > > <vpapavasile...@confluent.io.invalid> wrote: > > > > > > > >> Hi Guozhang, > > > >> > > > >> Thank you very much for your comments. > > > >> > > > >> Regarding 1: the extra state store is only needed in outer joins > since > > > >> that's the only case we have non-joining records that would need to > > get > > > >> emitted when the window closes, right? If we do decide to go with an > > > >> outer-join implementation, I will make sure to have the extra state > > > store > > > >> as well. Thank you for pointing it out. > > > >> > > > >> Regarding 2: As the self-join is only a physical optimization over > an > > > inner > > > >> join whose two arguments are the same entity, it should return the > > same > > > >> results as the inner join. We wouldn't want a user upgrading and > > > enabling > > > >> the optimization to suddenly see that their joins behave differently > > and > > > >> produce different results. > > > >> As an example, consider the records <A,1> and <A, 2> where A is the > > key > > > and > > > >> the number is the value and both are strings. Assume these records > are > > > >> piped into an input topic. And assume we have a self-join (not > > > optimized, > > > >> so inner join implementation) whose joiner concatenates the values. > > > >> The output of the join after processing the first record is : > <A,11>. > > > >> The output of the join after processing the second record is: > <A,21>, > > > >> <A,12>, <A,22> > > > >> So, for an inner join whose two arguments are the same stream, a > > record > > > >> does join with itself. And as a user, I would expect the self-join > > > >> optimization to produce the same results. What do you think? > > > >> > > > >> Regarding 3: I did a small experiment and I think the changes I did > > are > > > >> backwards compatible. Basically, I created a topology without the > > > >> optimization, had it process some data and killed it. Then I started > > it > > > >> again but with the optimization turned on, and the processing > resumed > > > fine > > > >> as in there was no exception and no extra state stores created and > the > > > join > > > >> results made sense. The optimization is keeping the same state store > > and > > > >> doesn't change the names or indices of nodes in the topology. I will > > > >> however need to add a case for self-joins in the upgrade system > tests > > to > > > >> make sure that things don't break. Is this sufficient? > > > >> Regarding the config, one way to go would be to have one config per > > > >> optimization but I am worried that this will get unwieldy if in the > > > future > > > >> we have a lot of them and also requires the user to know about the > > > >> optimizations to be able to benefit from them. Another alternative > is > > to > > > >> assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then > > all > > > >> optimizations are applied. If the user doesn't want a specific > > > >> optimization, then they need to turn that one off. So, we will have > a > > > >> config per optimization but they will be on by default. > > > >> > > > >> Best, > > > >> Vicky > > > >> > > > >> On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > >> > > > >> > Hello Vicky, > > > >> > > > > >> > Thanks for the KIP! I made a quick pass and here are some quick > > > thoughts: > > > >> > > > > >> > 1. Store Implementation: this may be not directly related to the > KIP > > > >> itself > > > >> > since its all internal, but the stream-stream join state store > > > >> > implementation has been changed in > > > >> > https://issues.apache.org/jira/browse/KAFKA-10847, in which we > > added > > > a > > > >> > separate store to maintain all the records that have not found a > > match > > > >> yet, > > > >> > and would emit them when time passed for left/outer joins. In this > > > >> > optimization, I think we can still go with a single store but we > > need > > > to > > > >> > make sure we do not regress on KAFKA-10847, i.e. for records not > > > finding > > > >> a > > > >> > match, we should also emit them when time passed by, this would > > likely > > > >> rely > > > >> > on the ability to range-over the only store on its "expired" > > records. > > > A > > > >> > good reference would be in the recent works to allow emitting > final > > > for > > > >> > windowed aggregations (cc @Hao Li <lihaos...@gmail.com> who can > > > provide > > > >> > some more references). > > > >> > > > > >> > 2. Join Semantics and Outer-Joins: I think we need to clarify for > > any > > > >> > single stream record, would itself also be considered a "match" > for > > > >> itself, > > > >> > OR should we consider only a different record but with the same > key > > > and > > > >> > within the join window a "match" for itself. If it's the former, > > then > > > I > > > >> > agree that outer-joins (even left-joins, right?) would not make > > sense > > > >> since > > > >> > we would always find at least a match for any record; if it's the > > > latter, > > > >> > then outer/left joins still make sense and we would need to > consider > > > the > > > >> > store implementation as stated in 1) above. Personally, I think > the > > > >> latter > > > >> > is better --- I know it's a bit away from the RDBMS self-join > > > semantics > > > >> but > > > >> > for RDBMS self-joins are usually not on PKs, but on FKs so I think > > its > > > >> > semantics is less relevant to what we are considering here for > > > windowed > > > >> > stream-stream joins which are still on PKs. > > > >> > > > > >> > 3. Compatibility: first of all, I think we should introduce new > > values > > > >> for > > > >> > the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in > > > >> addition > > > >> > to `all` and `none`, this is also what we discussed before to keep > > > >> > compatibility. But for applications that are already running, we'd > > > also > > > >> > need to make sure that after a rolling bounce with this config > value > > > >> > changed, we would not break the app. That involves: a) the store > > names > > > >> (and > > > >> > hence the changelog names) should not change -- when we use > > suffixes, > > > we > > > >> > should make sure they do not change by burning some suffixes as > > well, > > > b) > > > >> > the processor names, similar to store names, c) store formats, if > we > > > ever > > > >> > change the store formats, we need to consider a live upgrade path > as > > > >> well. > > > >> > > > > >> > Please let me know your thoughts. > > > >> > > > > >> > Guozhang > > > >> > > > > >> > > > > >> > On Tue, Aug 2, 2022 at 11:31 AM Vasiliki Papavasileiou > > > >> > <vpapavasile...@confluent.io.invalid> wrote: > > > >> > > > > >> > > Hello everyone, > > > >> > > > > > >> > > I would like to start the discussion for KIP-862: Implement > > > self-join > > > >> > > optimization > > > >> > > > > > >> > > The KIP can be found here: > > > >> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join > > > >> > > > > > >> > > Any suggestions are more than welcome. > > > >> > > > > > >> > > Many thanks, > > > >> > > Vicky > > > >> > > > > > >> > > > > >> > > > > >> > -- > > > >> > -- Guozhang > > > >> > > > > >> > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang