Hello Vicky,

I made a quick pass on your WIP PR and now I understand and agree that
compatibility is indeed preserved since we get the optimized topology in a
second pass, and hence we already "used and burned" the original topologies
naming suffices in the first pass.

Regarding the configuration patterns, I still have a bit concern about it:
primarily, if we follow this pattern to introduce a new config for each
optimization rule, in the future we would have a lot of configs --- one per
rule --- inside the StreamsConfig. I thought about this back and forth
again and still feel that this may not be what we want.. I think stead, we
can change the existing `TOPOLOGY_OPTIMIZATION_CONFIG` to accept a list of
strings, separated by comma --- this aligns with other similar configs as
well --- so that for different scenarios users can choose either fine
grained or coarse grained controls, e.g.:

* I just want to enable all rules, or none: "all", "none".
* I know my app was created with Kafka version X, and I just want to only
apply all rules that are already there since version X: "versionX" --- I
just made it up for future use cases since we discussed about it in the
original KIP when we introduced "TOPOLOGY_OPTIMIZATION_CONFIG", we do not
need to include it in this KIP.
* I know my app is compatible with specific rules A/B/C, and I just want to
always enable those and not others: "ruleA,ruleB,ruleC".

SO far we only have a few rules: a) reuse source topic as changelog topic
for KTable, b) merge duplicate repartition topics, c) self-join (this KIP),
so I suggest in this KIP, we just add make the
`TOPOLOGY_OPTIMIZATION_CONFIG` accepting a list of string, but 1) check
that some strings cannot coexist (e.g. `none` and all`), and 2) add a new
string value for self-join itself. In this way:

* People who chose `none` before will not be impacted.
* People who chose `all` before will get this optimization by default, and
it's backward compatible so it's okay; they also get what they meant: I
just want "all" :)
* Advanced users who read about this KIP and just what it but not others:
they will change their config from `none` to `self-join`.

WDYT?


Guozhang




On Fri, Aug 12, 2022 at 7:25 PM John Roesler <vvcep...@apache.org> wrote:

> Thanks for the KIP, Vicky!
>
> Re 1/2, I agree with what you both worked out.
>
> Re 3: It sounds like you were able to preserve backward compatibility, so
> I don’t think you need to add any new configs. I think you can just switch
> it on if people specify “all”.
>
> Thanks!
> -John
>
>
> On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote:
> > Thanks Vicky for your reply!
> >
> > Re 1/2): I think you have a great point here to adhere with the existing
> > implementation, I'm convinced. In that case we do not need to consider
> > left/outer-joins, and hence do not need to worry about the extra store in
> > the impl.
> >
> > Re 3): I'm curious how the compatibility is preserved since with
> > optimizations turned on, we would use fewer stores and hence the store
> name
> > suffixes would change. In your experiment did you specifically specify
> the
> > store names, e.g. via Materialized? I'd be glad if it turns out to really
> > be conveniently backward compatible, and rest with my concerns :)
> >
> >
> > Guozhang
> >
> > On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
> > <vpapavasile...@confluent.io.invalid> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Thank you very much for your comments.
> >>
> >> Regarding 1: the extra state store is only needed in outer joins since
> >> that's the only case we have non-joining records that would need to get
> >> emitted when the window closes, right? If we do decide to go with an
> >> outer-join implementation, I will make sure to have the extra state
> store
> >> as well. Thank you for pointing it out.
> >>
> >> Regarding 2: As the self-join is only a physical optimization over an
> inner
> >> join whose two arguments are the same entity, it should return the same
> >> results as the inner join. We wouldn't want a user upgrading and
> enabling
> >> the optimization to suddenly see that their joins behave differently and
> >> produce different results.
> >> As an example, consider the records <A,1> and <A, 2> where A is the key
> and
> >> the number is the value and both are strings. Assume these records are
> >> piped into an input topic. And assume we have a self-join (not
> optimized,
> >> so inner join implementation) whose joiner concatenates the values.
> >> The output of the join after processing the first record is : <A,11>.
> >> The output of the join after processing the second record is: <A,21>,
> >> <A,12>, <A,22>
> >> So, for an inner join whose two arguments are the same stream, a record
> >> does join with itself. And as a user, I would expect the self-join
> >> optimization to produce the same results. What do you think?
> >>
> >> Regarding 3: I did a small experiment and I think the changes I did are
> >> backwards compatible. Basically, I created a topology without the
> >> optimization, had it process some data and killed it. Then I started it
> >> again but with the optimization turned on, and the processing resumed
> fine
> >> as in there was no exception and no extra state stores created and the
> join
> >> results made sense. The optimization is keeping the same state store and
> >> doesn't change the names or indices of nodes in the topology. I will
> >> however need to add a case for self-joins in the upgrade system tests to
> >> make sure that things don't break. Is this sufficient?
> >> Regarding the config, one way to go would be to have one config per
> >> optimization but I am worried that this will get unwieldy if in the
> future
> >> we have a lot of them and also requires the user to know about the
> >> optimizations to be able to benefit from them. Another alternative is to
> >> assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all
> >> optimizations are applied. If the user doesn't want a specific
> >> optimization, then they need to turn that one off. So, we will have a
> >> config per optimization but they will be on by default.
> >>
> >> Best,
> >> Vicky
> >>
> >> On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> > Hello Vicky,
> >> >
> >> > Thanks for the KIP! I made a quick pass and here are some quick
> thoughts:
> >> >
> >> > 1. Store Implementation: this may be not directly related to the KIP
> >> itself
> >> > since its all internal, but the stream-stream join state store
> >> > implementation has been changed in
> >> > https://issues.apache.org/jira/browse/KAFKA-10847, in which we added
> a
> >> > separate store to maintain all the records that have not found a match
> >> yet,
> >> > and would emit them when time passed for left/outer joins. In this
> >> > optimization, I think we can still go with a single store but we need
> to
> >> > make sure we do not regress on KAFKA-10847, i.e. for records not
> finding
> >> a
> >> > match, we should also emit them when time passed by, this would likely
> >> rely
> >> > on the ability to range-over the only store on its "expired" records.
> A
> >> > good reference would be in the recent works to allow emitting final
> for
> >> > windowed aggregations (cc @Hao Li <lihaos...@gmail.com> who can
> provide
> >> > some more references).
> >> >
> >> > 2. Join Semantics and Outer-Joins: I think we need to clarify for any
> >> > single stream record, would itself also be considered a "match" for
> >> itself,
> >> > OR should we consider only a different record but with the same key
> and
> >> > within the join window a "match" for itself. If it's the former, then
> I
> >> > agree that outer-joins (even left-joins, right?) would not make sense
> >> since
> >> > we would always find at least a match for any record; if it's the
> latter,
> >> > then outer/left joins still make sense and we would need to consider
> the
> >> > store implementation as stated in 1) above. Personally, I think the
> >> latter
> >> > is better --- I know it's a bit away from the RDBMS self-join
> semantics
> >> but
> >> > for RDBMS self-joins are usually not on PKs, but on FKs so I think its
> >> > semantics is less relevant to what we are considering here for
> windowed
> >> > stream-stream joins which are still on PKs.
> >> >
> >> > 3. Compatibility: first of all, I think we should introduce new values
> >> for
> >> > the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in
> >> addition
> >> > to `all` and `none`, this is also what we discussed before to keep
> >> > compatibility. But for applications that are already running, we'd
> also
> >> > need to make sure that after a rolling bounce with this config value
> >> > changed, we would not break the app. That involves: a) the store names
> >> (and
> >> > hence the changelog names) should not change -- when we use suffixes,
> we
> >> > should make sure they do not change by burning some suffixes as well,
> b)
> >> > the processor names, similar to store names, c) store formats, if we
> ever
> >> > change the store formats, we need to consider a live upgrade path as
> >> well.
> >> >
> >> > Please let me know your thoughts.
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Tue, Aug 2, 2022 at 11:31 AM Vasiliki Papavasileiou
> >> > <vpapavasile...@confluent.io.invalid> wrote:
> >> >
> >> > > Hello everyone,
> >> > >
> >> > > I would like to start the discussion for KIP-862: Implement
> self-join
> >> > > optimization
> >> > >
> >> > > The KIP can be found here:
> >> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join
> >> > >
> >> > > Any suggestions are more than welcome.
> >> > >
> >> > > Many thanks,
> >> > > Vicky
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Reply via email to