Thanks for the discussion, Bill, John, Guozhang, and Matthias!

It sounds like consensus is that we would like to redesign the join
interfaces more broadly to have fewer overloads, but the scope of this
redesign is larger than makes sense for this particular KIP.

As for the interfaces proposed in this particular KIP, I don't think we
should wrap either left/inner join or the foreign-key extractor into the
new `TableJoined` object, as those changes (if we want to make them) are
better left for the larger redesign. That leaves the question of whether it
makes sense to wrap the optional `Materialized` into the new `TableJoined`
object in this KIP. I'm inclined to leave it out as well since
`Materialized` isn't really a property of the join; rather, it indicates
whether to materialize the result of the table operation (which happens to
be a join). I can see the case for including `Materialized` within
`TableJoined` as part of the larger join redesign in the future but doing
so now seems like it'd introduce more confusion than not, since all the
other table operations (including primary-key joins) include method
overloads which accept `Materialized` and users might miss that the
foreign-key join case is a special snowflake.

That said, I'm happy to wrap `Materialized` into `TableJoined` in this KIP
if others feel differently. Unless that's the case, I don't think the KIP
needs any further updates and plan to call for a vote soon. Thanks again,
all!

- Victoria

On Sat, Sep 25, 2021 at 11:38 AM Matthias J. Sax <mj...@apache.org> wrote:

> The current proposal to replace the `Named` overload with `TableJoined`
> overload does not increase the number of method we have. Thus, I don't
> share you concern, and I also think that even if an API redesign make be
> good in general, it not the right thing to piggyback on this KIP.
>
> I also think that we should not put `inner/left/outer` as a
> _configuration_ for the join an any case, because to me,
> inner/left/outer is really a different (first class) operator.
>
> Furthermore, if we make the key-extractor optional to distinguish
> between PK and FK join, we run into the issue that users could pass in a
> key-extractor into an outer join: however, FK joins are not supported
> for outer join and thus we lose the current API level (compile time)
> guard, but could only do a runtime check, which I personally think would
> be a step backwards.
>
>
> -Matthias
>
> On 9/24/21 8:21 AM, Bill Bejeck wrote:
> > Hi Victoria,
> >
> > Earlier in the thread, I proposed rolling `Function<V, KO>
> > foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
> > byte[]>>` into TableJoined as well. From that perspective, I partially
> > agree with John's latest alternative, but I think we should still retain
> > the `leftJoin`, `join`, and `outerJoin` methods on the KTable interface.
> >
> > I favor this approach as we would be able to get the KTable interface
> join
> > methods down to 3 (after removal in the next major release).  But I
> realize
> > this is a bit beyond the intention of this KIP, so I'd still support it
> as
> > is.
> >
> > Thanks,
> > Bill
> >
> >
> >
> > On Wed, Sep 22, 2021 at 6:54 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> >> Hi Victoria,
> >>
> >> Here's the other conern I mentioned. I didn't bother
> >> bringing it up before because it would be obviated by the
> >> last proposal.
> >>
> >> I'm wondering about the need to add four new overloads and a
> >> new config object. We already have 20 KTable#join overloads,
> >> out of a total of 47 KTable operations. In other words,
> >> after this proposal, almost 50% of the KTable interface will
> >> be devoted to slightly different variations of join().
> >>
> >> This kind of proliferation has a number of downsides, which
> >> I documented in my proposal to fix it here:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> >>
> >> We decided in the past to defer completely refactoring all
> >> of the Streams DSL because we didn't forsee things becoming
> >> much worse than they were at the time. Not to harp on this
> >> particular KIP, but it does sort of seem like we're
> >> approaching critical mass with 24 join overloads.
> >>
> >> What do you think about seeking some kind of middle ground
> >> by pulling more of the optional join arguments into the
> >> proposed TableJoined interface? After all, starting to chip
> >> away at this problem was the idea behind making these config
> >> objects implement NamedOperation to begin with.
> >>
> >> Specifically, this idea would be to add just one overload:
> >> KTable#join(TableJoined<K, KO, VO, VR>)
> >>
> >> And the TableJoined interface would have one static method
> >> with the required `KTable<KO, VO> other` and `ValueJoiner<V,
> >> VO, VR> joiner` arguments.
> >>
> >> The rest of the arguments (isLeft, fkExtractor,
> >> materialized, etc) would be settable using `withXYZ` builder
> >> methods.
> >>
> >> If that proposal isn't clear, I'd be happy to provide a code
> >> sketch.
> >>
> >> Thanks in advance for your consideration, and thanks again
> >> for driving this,
> >> -John
> >>
> >>
> >> On Wed, 2021-09-22 at 17:34 -0500, John Roesler wrote:
> >>> Thanks, Guozhang!
> >>>
> >>> I agree it would be bigger in scope to approach it that way.
> >>> We certainly would need to implement a way for the
> >>> partitioner to flow down the topology, like we do with
> >>> serdes and grace period information. It's not super complex,
> >>> but not trivial either.
> >>>
> >>> I think we've always been mildly uncomfortable with
> >>> assumption (1), but we just kind of hope for the best for
> >>> performance reasons. But that's because the alternative we
> >>> considered in the past was to proactively repartition all
> >>> inputs. If the question is instead just getting a
> >>> partitioner and validating that the partitioning of the
> >>> table is correct, I think the performance question is much
> >>> different. However, we have also gone a really long time
> >>> with assumption (1) in place, and I don't believe it has
> >>> ever been an issue for anyone.
> >>>
> >>> I appreciate your consideration. I still think it would be
> >>> nice to approach it more from a data provenance perspective,
> >>> but I don't feel strongly about it. I'm happy to leave it up
> >>> to Victoria's discretion.
> >>>
> >>> I have one more comment about the KIP, but I'll send a
> >>> separate message for clarity.
> >>>
> >>> Thanks again for considering it,
> >>> -John
> >>>
> >>> On Wed, 2021-09-22 at 14:50 -0700, Guozhang Wang wrote:
> >>>> Thanks Victoria for writing the KIP! I think this is a miss when we
> >>>> designed KIP-213 and should be fixed in syntax. Regarding how large
> its
> >>>> scope should be, here's my thoughts:
> >>>>
> >>>> 1) Today Streams does not take any indicator on how the input
> >> stream/table
> >>>> are partitioned, instead it simply assumes that the input stream/table
> >> is
> >>>> always partitioned by the key already. This is by-design that we let
> >> the
> >>>> users to make sure this is always the case; for internal
> repartitioning
> >>>> like `groupBy`s, Streams does guarantee the the repartition topics are
> >>>> partitioned by the new grouping keys, but nevertheless the assumption
> >> still
> >>>> holds: for all stream/table entities defined from a input topic
> >> (whether
> >>>> external or internal), that topic is partitioned by the stream/table
> >> key,
> >>>> and hence at the moment we do not require any partitioners to be
> >> passed in
> >>>> since we do not need it.
> >>>>
> >>>> 2) For all operators that need to write to a partition, today the
> >>>> partitioners are either defined by the operator logic itself (think:
> >>>> groupBys, where partitioner is hard-coded as by the grouping-key), or
> >>>> user-customized (think: the #through/to APIs). We do not have any
> >> scenarios
> >>>> where we need to "inherit" partitioners from parent operators, until
> >>>> FK-joins. Here, we need to make sure: a) the left table A's input
> >> topic and
> >>>> the internal response topic are co-partitioned; b) the right table B's
> >>>> input topic and the internal subscription topic are co-partitioned. Of
> >>>> course, if both left table A and right table B's input topic are
> >>>> partitioned by the default partitioner, then both holds. But the
> >> assumption
> >>>> above only requires that the "topic is partitioned by the key", not
> >>>> requiring "the topic is partitioned by the key, following exactly the
> >>>> default partitioner mechanism" (e.g. in
> >>>> https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises
> >> when a
> >>>> different partitioner which is not based on hashing of the bytes is
> >> used,
> >>>> which still guarantees "the input topic is partitioned by key").
> >>>>
> >>>> So I feel that if we feel the assumption 1) above in Streams should
> >> still
> >>>> hold in the long run, it's not very meaningful to require the source
> >> tables
> >>>> to indicate their partitioners, but only require the FK join operators
> >>>> itself to make sure the co-partition conditions a) and b) above holds.
> >> Of
> >>>> course the easiest way is to require users to pass-in the partitioners
> >> for
> >>>> those two internal topics, which they have to make sure are the same
> >> as the
> >>>> input partitioners. We can also have a bit more complicated approach
> to
> >>>> have some "inheritance" rule for partitioners when they are given at
> >> the
> >>>> sink (we have similar rules for serde inheritance throughout the
> >> topology),
> >>>> but that only fixes for "#through / #repartition" cases, but not fixes
> >> for
> >>>> source "builder#table" cases -- i.e. we still need to require users to
> >>>> indicate partitioners which we can hopefully inherit within the
> >> topology.
> >>>>
> >>>> I agree that for IQ, it would be beneficial if we have the partitioner
> >> for
> >>>> each table/stream entities so that IQ itself does not need the
> >> partitioners
> >>>> to be specified, but to make this fix work, we'd probably need both 1)
> >>>> source table/stream partitioner specification and 2) inheritance rule
> >>>> (otherwise we'd have to enforce users to specify the same for
> >> #repartition
> >>>> etc as well?). Given that rationale, I'm slightly leaning towards the
> >>>> current proposal in the KIP doc, i.e. just fixing for this FK operator
> >>>> only, with the easy approach that requires users themselves to set
> >>>> partitioners accordingly.
> >>>>
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Sep 22, 2021 at 10:29 AM John Roesler <vvcep...@apache.org>
> >> wrote:
> >>>>
> >>>>> Thanks for the KIP, Victoria!
> >>>>>
> >>>>> This is a great catch. As fas as my memory serves, we
> >>>>> totally missed this case in the design of FK joins.
> >>>>>
> >>>>> tl;dr: I'm wondering if it might be better to instead
> >>>>> introduce a way to specify the partitioning scheme when we
> >>>>> create a KTable and then just use it when we join, rather
> >>>>> than to specify it in the join itself.
> >>>>>
> >>>>> I was initially surprised to see the proposal to add these
> >>>>> partitioners in the join operation itself rather than
> >>>>> inheriting them from the tables on both sides, but I
> >>>>> reviewed the Streams DSL and see that we _cannot_ inherit it
> >>>>> because almost all the time, the input KTables' partitioning
> >>>>> is not known!
> >>>>>
> >>>>> It seems like the only times we specify a partitioner on a
> >>>>> DSL object is:
> >>>>> 1. when we produce to an output topic via KStream#to or
> >>>>> KStream#through.
> >>>>> 2. when we repartition via KStream#repartition
> >>>>>
> >>>>> These are both specifying the partitioner to use in output
> >>>>> operations (ie, we are telling Streams the partition *to
> >>>>> use*); there's currently only one situation in which we have
> >>>>> to _inform_ streams about the partitioning of a KTable or
> >>>>> KStream:
> >>>>> 3. when we issue a key-based query via IQ, we need to know
> >>>>> the partitioner, so the IQ interface allows us to pass in a
> >>>>> custom partitioner with the query.
> >>>>>
> >>>>> This is a bit weird. Taking a step back, the partitioning
> >>>>> scheme is a property of the table itself, not of the query
> >>>>> (or the join). Specifying a table property as part of a
> >>>>> query (or join) on the table seems to be an indication that
> >>>>> the KTable definition itself is lacking something.
> >>>>>
> >>>>> Perhaps a more comprehensive approach would be to add an
> >>>>> optional StreamPartitioner parameter to StreamBuilder#table.
> >>>>> Then, queries and joins (and anything else that partitioner-
> >>>>> sensitive now and in the future) could simply inherit the
> >>>>> partitioner via the topology.
> >>>>>
> >>>>> WDYT?
> >>>>>
> >>>>> Thanks,
> >>>>> -John
> >>>>>
> >>>>> On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote:
> >>>>>> Hi Victoria,
> >>>>>>
> >>>>>> Thanks for the KIP; this is a beneficial addition.
> >>>>>>
> >>>>>> I'm a +1 on the KIP and the changes made:
> >>>>>>
> >>>>>>     - Using a config object TableJoined
> >>>>>>     - Limiting the static methods to two
> >>>>>>
> >>>>>> I have an additional "wild" thought about rolling the `Function<V,
> >> KO>
> >>>>>> foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
> >>>>>> byte[]>>` into TableJoined to align and reduce the number of KTable
> >>>>>> interfaces.  But I don't have a strong opinion on this; I am
> >> curious to
> >>>>> see
> >>>>>> what others think about this possibility.
> >>>>>>
> >>>>>> -Bill
> >>>>>>
> >>>>>> On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for updating the KIP.
> >>>>>>>
> >>>>>>> One nit:
> >>>>>>>
> >>>>>>>> The existing methods which accept Named will be marked for
> >>>>> deprecation
> >>>>>>> in 4.0.
> >>>>>>>
> >>>>>>> We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0)
> >> and
> >>>>> (2)
> >>>>>>> a KIP could always slip into a future release.
> >>>>>>>
> >>>>>>>
> >>>>>>> About `TableJoined`: It seems you propose to add static methods
> >> for all
> >>>>>>> possible parameter combination. We usually try to avoid this to
> >> keep
> >>>>> the
> >>>>>>> number of methods low; if we add too many methods, it defeats the
> >>>>>>> purpose to use a "builder like" config object.
> >>>>>>>
> >>>>>>> To me, it seems sufficient to only have two static methods:
> >>>>>>>
> >>>>>>>> as(final String name);
> >>>>>>>
> >>>>>>> and
> >>>>>>>
> >>>>>>>> with(final StreamPartitioner<K, Void> partitioner,
> >>>>>>>>       final StreamPartitioner<KO, Void> otherPartitioner);
> >>>>>>>
> >>>>>>> The second one should allow to pass in `null` to only set one of
> >> both
> >>>>>>> partitioners.
> >>>>>>>
> >>>>>>> Curious to hear what other think.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 9/20/21 8:27 PM, Victoria Xia wrote:
> >>>>>>>> Hi Matthias,
> >>>>>>>>
> >>>>>>>> Thanks for having a look at the KIP! I've updated it with your
> >>>>> suggestion
> >>>>>>>> to introduce a new `TableJoined` object with partitioners of
> >> type
> >>>>>>>> `StreamPartitioner<K, Void>` and `StreamPartitioner<KO,
> >> Void>`, and
> >>>>> to
> >>>>>>>> deprecate the existing FK join methods which accept a `Named`
> >> object
> >>>>>>>> accordingly. I agree it makes sense to keep the number of join
> >>>>> interfaces
> >>>>>>>> smaller.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Victoria
> >>>>>>>>
> >>>>>>>> On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <
> >> mj...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for the KIP Victoria.
> >>>>>>>>>
> >>>>>>>>> As pointed out on the Jira ticket by you, using `<K,V>` and
> >>>>> `<KO,VO>` as
> >>>>>>>>> partitioner types does not really work, because we don't have
> >>>>> access to
> >>>>>>>>> the right value on the left side nor have we access to the
> >> left
> >>>>> value on
> >>>>>>>>> the right hand side. -- I like your idea to use `Void` as
> >> value
> >>>>> types to
> >>>>>>>>> make it clear to the users that partitioning must be done on
> >> the
> >>>>> key
> >>>>>>> only.
> >>>>>>>>>
> >>>>>>>>> For the proposed public API change, I would propose not to
> >> pass the
> >>>>>>>>> partitioners directly, but to introduce a config object
> >> (similar to
> >>>>>>>>> `Joined` for stream-table joins, and `StreamJoined` for
> >>>>> stream-stream
> >>>>>>>>> joins). This new object could also implement
> >> `NamedOperation` and
> >>>>> thus
> >>>>>>>>> replace `Named`. To this end, we would deprecate the existing
> >>>>> methods
> >>>>>>>>> using `Named` and replace them with the new methods. Net
> >> benefit
> >>>>> is,
> >>>>>>>>> that we don't get more overloads (after we removed the
> >> deprecated
> >>>>> ones).
> >>>>>>>>>
> >>>>>>>>> Not sure how we want to call the new object. Maybe
> >> `TableJoined` in
> >>>>>>>>> alignment to `StreamJoined`?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 9/15/21 3:36 PM, Victoria Xia wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I've opened a small KIP for adding Kafka Streams support
> >> for
> >>>>> foreign
> >>>>>>> key
> >>>>>>>>>> joins on tables with custom partitioners:
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> >>>>>>>>>>
> >>>>>>>>>> Feedback appreciated. Thanks!
> >>>>>>>>>>
> >>>>>>>>>> - Victoria
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >>
> >
>

Reply via email to