Thanks for the KIP, Vicky!

Re 1/2, I agree with what you both worked out. 

Re 3: It sounds like you were able to preserve backward compatibility, so I 
don’t think you need to add any new configs. I think you can just switch it on 
if people specify “all”. 

Thanks!
-John


On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote:
> Thanks Vicky for your reply!
>
> Re 1/2): I think you have a great point here to adhere with the existing
> implementation, I'm convinced. In that case we do not need to consider
> left/outer-joins, and hence do not need to worry about the extra store in
> the impl.
>
> Re 3): I'm curious how the compatibility is preserved since with
> optimizations turned on, we would use fewer stores and hence the store name
> suffixes would change. In your experiment did you specifically specify the
> store names, e.g. via Materialized? I'd be glad if it turns out to really
> be conveniently backward compatible, and rest with my concerns :)
>
>
> Guozhang
>
> On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
> <vpapavasile...@confluent.io.invalid> wrote:
>
>> Hi Guozhang,
>>
>> Thank you very much for your comments.
>>
>> Regarding 1: the extra state store is only needed in outer joins since
>> that's the only case we have non-joining records that would need to get
>> emitted when the window closes, right? If we do decide to go with an
>> outer-join implementation, I will make sure to have the extra state store
>> as well. Thank you for pointing it out.
>>
>> Regarding 2: As the self-join is only a physical optimization over an inner
>> join whose two arguments are the same entity, it should return the same
>> results as the inner join. We wouldn't want a user upgrading and enabling
>> the optimization to suddenly see that their joins behave differently and
>> produce different results.
>> As an example, consider the records <A,1> and <A, 2> where A is the key and
>> the number is the value and both are strings. Assume these records are
>> piped into an input topic. And assume we have a self-join (not optimized,
>> so inner join implementation) whose joiner concatenates the values.
>> The output of the join after processing the first record is : <A,11>.
>> The output of the join after processing the second record is: <A,21>,
>> <A,12>, <A,22>
>> So, for an inner join whose two arguments are the same stream, a record
>> does join with itself. And as a user, I would expect the self-join
>> optimization to produce the same results. What do you think?
>>
>> Regarding 3: I did a small experiment and I think the changes I did are
>> backwards compatible. Basically, I created a topology without the
>> optimization, had it process some data and killed it. Then I started it
>> again but with the optimization turned on, and the processing resumed fine
>> as in there was no exception and no extra state stores created and the join
>> results made sense. The optimization is keeping the same state store and
>> doesn't change the names or indices of nodes in the topology. I will
>> however need to add a case for self-joins in the upgrade system tests to
>> make sure that things don't break. Is this sufficient?
>> Regarding the config, one way to go would be to have one config per
>> optimization but I am worried that this will get unwieldy if in the future
>> we have a lot of them and also requires the user to know about the
>> optimizations to be able to benefit from them. Another alternative is to
>> assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all
>> optimizations are applied. If the user doesn't want a specific
>> optimization, then they need to turn that one off. So, we will have a
>> config per optimization but they will be on by default.
>>
>> Best,
>> Vicky
>>
>> On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> > Hello Vicky,
>> >
>> > Thanks for the KIP! I made a quick pass and here are some quick thoughts:
>> >
>> > 1. Store Implementation: this may be not directly related to the KIP
>> itself
>> > since its all internal, but the stream-stream join state store
>> > implementation has been changed in
>> > https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a
>> > separate store to maintain all the records that have not found a match
>> yet,
>> > and would emit them when time passed for left/outer joins. In this
>> > optimization, I think we can still go with a single store but we need to
>> > make sure we do not regress on KAFKA-10847, i.e. for records not finding
>> a
>> > match, we should also emit them when time passed by, this would likely
>> rely
>> > on the ability to range-over the only store on its "expired" records. A
>> > good reference would be in the recent works to allow emitting final for
>> > windowed aggregations (cc @Hao Li <lihaos...@gmail.com> who can provide
>> > some more references).
>> >
>> > 2. Join Semantics and Outer-Joins: I think we need to clarify for any
>> > single stream record, would itself also be considered a "match" for
>> itself,
>> > OR should we consider only a different record but with the same key and
>> > within the join window a "match" for itself. If it's the former, then I
>> > agree that outer-joins (even left-joins, right?) would not make sense
>> since
>> > we would always find at least a match for any record; if it's the latter,
>> > then outer/left joins still make sense and we would need to consider the
>> > store implementation as stated in 1) above. Personally, I think the
>> latter
>> > is better --- I know it's a bit away from the RDBMS self-join semantics
>> but
>> > for RDBMS self-joins are usually not on PKs, but on FKs so I think its
>> > semantics is less relevant to what we are considering here for windowed
>> > stream-stream joins which are still on PKs.
>> >
>> > 3. Compatibility: first of all, I think we should introduce new values
>> for
>> > the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in
>> addition
>> > to `all` and `none`, this is also what we discussed before to keep
>> > compatibility. But for applications that are already running, we'd also
>> > need to make sure that after a rolling bounce with this config value
>> > changed, we would not break the app. That involves: a) the store names
>> (and
>> > hence the changelog names) should not change -- when we use suffixes, we
>> > should make sure they do not change by burning some suffixes as well, b)
>> > the processor names, similar to store names, c) store formats, if we ever
>> > change the store formats, we need to consider a live upgrade path as
>> well.
>> >
>> > Please let me know your thoughts.
>> >
>> > Guozhang
>> >
>> >
>> > On Tue, Aug 2, 2022 at 11:31 AM Vasiliki Papavasileiou
>> > <vpapavasile...@confluent.io.invalid> wrote:
>> >
>> > > Hello everyone,
>> > >
>> > > I would like to start the discussion for KIP-862: Implement self-join
>> > > optimization
>> > >
>> > > The KIP can be found here:
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join
>> > >
>> > > Any suggestions are more than welcome.
>> > >
>> > > Many thanks,
>> > > Vicky
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> -- 
> -- Guozhang

Reply via email to