Hi Victoria,

Earlier in the thread, I proposed rolling `Function<V, KO>
foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
byte[]>>` into TableJoined as well. From that perspective, I partially
agree with John's latest alternative, but I think we should still retain
the `leftJoin`, `join`, and `outerJoin` methods on the KTable interface.

I favor this approach as we would be able to get the KTable interface join
methods down to 3 (after removal in the next major release).  But I realize
this is a bit beyond the intention of this KIP, so I'd still support it as
is.

Thanks,
Bill



On Wed, Sep 22, 2021 at 6:54 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Victoria,
>
> Here's the other conern I mentioned. I didn't bother
> bringing it up before because it would be obviated by the
> last proposal.
>
> I'm wondering about the need to add four new overloads and a
> new config object. We already have 20 KTable#join overloads,
> out of a total of 47 KTable operations. In other words,
> after this proposal, almost 50% of the KTable interface will
> be devoted to slightly different variations of join().
>
> This kind of proliferation has a number of downsides, which
> I documented in my proposal to fix it here:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
>
> We decided in the past to defer completely refactoring all
> of the Streams DSL because we didn't forsee things becoming
> much worse than they were at the time. Not to harp on this
> particular KIP, but it does sort of seem like we're
> approaching critical mass with 24 join overloads.
>
> What do you think about seeking some kind of middle ground
> by pulling more of the optional join arguments into the
> proposed TableJoined interface? After all, starting to chip
> away at this problem was the idea behind making these config
> objects implement NamedOperation to begin with.
>
> Specifically, this idea would be to add just one overload:
> KTable#join(TableJoined<K, KO, VO, VR>)
>
> And the TableJoined interface would have one static method
> with the required `KTable<KO, VO> other` and `ValueJoiner<V,
> VO, VR> joiner` arguments.
>
> The rest of the arguments (isLeft, fkExtractor,
> materialized, etc) would be settable using `withXYZ` builder
> methods.
>
> If that proposal isn't clear, I'd be happy to provide a code
> sketch.
>
> Thanks in advance for your consideration, and thanks again
> for driving this,
> -John
>
>
> On Wed, 2021-09-22 at 17:34 -0500, John Roesler wrote:
> > Thanks, Guozhang!
> >
> > I agree it would be bigger in scope to approach it that way.
> > We certainly would need to implement a way for the
> > partitioner to flow down the topology, like we do with
> > serdes and grace period information. It's not super complex,
> > but not trivial either.
> >
> > I think we've always been mildly uncomfortable with
> > assumption (1), but we just kind of hope for the best for
> > performance reasons. But that's because the alternative we
> > considered in the past was to proactively repartition all
> > inputs. If the question is instead just getting a
> > partitioner and validating that the partitioning of the
> > table is correct, I think the performance question is much
> > different. However, we have also gone a really long time
> > with assumption (1) in place, and I don't believe it has
> > ever been an issue for anyone.
> >
> > I appreciate your consideration. I still think it would be
> > nice to approach it more from a data provenance perspective,
> > but I don't feel strongly about it. I'm happy to leave it up
> > to Victoria's discretion.
> >
> > I have one more comment about the KIP, but I'll send a
> > separate message for clarity.
> >
> > Thanks again for considering it,
> > -John
> >
> > On Wed, 2021-09-22 at 14:50 -0700, Guozhang Wang wrote:
> > > Thanks Victoria for writing the KIP! I think this is a miss when we
> > > designed KIP-213 and should be fixed in syntax. Regarding how large its
> > > scope should be, here's my thoughts:
> > >
> > > 1) Today Streams does not take any indicator on how the input
> stream/table
> > > are partitioned, instead it simply assumes that the input stream/table
> is
> > > always partitioned by the key already. This is by-design that we let
> the
> > > users to make sure this is always the case; for internal repartitioning
> > > like `groupBy`s, Streams does guarantee the the repartition topics are
> > > partitioned by the new grouping keys, but nevertheless the assumption
> still
> > > holds: for all stream/table entities defined from a input topic
> (whether
> > > external or internal), that topic is partitioned by the stream/table
> key,
> > > and hence at the moment we do not require any partitioners to be
> passed in
> > > since we do not need it.
> > >
> > > 2) For all operators that need to write to a partition, today the
> > > partitioners are either defined by the operator logic itself (think:
> > > groupBys, where partitioner is hard-coded as by the grouping-key), or
> > > user-customized (think: the #through/to APIs). We do not have any
> scenarios
> > > where we need to "inherit" partitioners from parent operators, until
> > > FK-joins. Here, we need to make sure: a) the left table A's input
> topic and
> > > the internal response topic are co-partitioned; b) the right table B's
> > > input topic and the internal subscription topic are co-partitioned. Of
> > > course, if both left table A and right table B's input topic are
> > > partitioned by the default partitioner, then both holds. But the
> assumption
> > > above only requires that the "topic is partitioned by the key", not
> > > requiring "the topic is partitioned by the key, following exactly the
> > > default partitioner mechanism" (e.g. in
> > > https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises
> when a
> > > different partitioner which is not based on hashing of the bytes is
> used,
> > > which still guarantees "the input topic is partitioned by key").
> > >
> > > So I feel that if we feel the assumption 1) above in Streams should
> still
> > > hold in the long run, it's not very meaningful to require the source
> tables
> > > to indicate their partitioners, but only require the FK join operators
> > > itself to make sure the co-partition conditions a) and b) above holds.
> Of
> > > course the easiest way is to require users to pass-in the partitioners
> for
> > > those two internal topics, which they have to make sure are the same
> as the
> > > input partitioners. We can also have a bit more complicated approach to
> > > have some "inheritance" rule for partitioners when they are given at
> the
> > > sink (we have similar rules for serde inheritance throughout the
> topology),
> > > but that only fixes for "#through / #repartition" cases, but not fixes
> for
> > > source "builder#table" cases -- i.e. we still need to require users to
> > > indicate partitioners which we can hopefully inherit within the
> topology.
> > >
> > > I agree that for IQ, it would be beneficial if we have the partitioner
> for
> > > each table/stream entities so that IQ itself does not need the
> partitioners
> > > to be specified, but to make this fix work, we'd probably need both 1)
> > > source table/stream partitioner specification and 2) inheritance rule
> > > (otherwise we'd have to enforce users to specify the same for
> #repartition
> > > etc as well?). Given that rationale, I'm slightly leaning towards the
> > > current proposal in the KIP doc, i.e. just fixing for this FK operator
> > > only, with the easy approach that requires users themselves to set
> > > partitioners accordingly.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Wed, Sep 22, 2021 at 10:29 AM John Roesler <vvcep...@apache.org>
> wrote:
> > >
> > > > Thanks for the KIP, Victoria!
> > > >
> > > > This is a great catch. As fas as my memory serves, we
> > > > totally missed this case in the design of FK joins.
> > > >
> > > > tl;dr: I'm wondering if it might be better to instead
> > > > introduce a way to specify the partitioning scheme when we
> > > > create a KTable and then just use it when we join, rather
> > > > than to specify it in the join itself.
> > > >
> > > > I was initially surprised to see the proposal to add these
> > > > partitioners in the join operation itself rather than
> > > > inheriting them from the tables on both sides, but I
> > > > reviewed the Streams DSL and see that we _cannot_ inherit it
> > > > because almost all the time, the input KTables' partitioning
> > > > is not known!
> > > >
> > > > It seems like the only times we specify a partitioner on a
> > > > DSL object is:
> > > > 1. when we produce to an output topic via KStream#to or
> > > > KStream#through.
> > > > 2. when we repartition via KStream#repartition
> > > >
> > > > These are both specifying the partitioner to use in output
> > > > operations (ie, we are telling Streams the partition *to
> > > > use*); there's currently only one situation in which we have
> > > > to _inform_ streams about the partitioning of a KTable or
> > > > KStream:
> > > > 3. when we issue a key-based query via IQ, we need to know
> > > > the partitioner, so the IQ interface allows us to pass in a
> > > > custom partitioner with the query.
> > > >
> > > > This is a bit weird. Taking a step back, the partitioning
> > > > scheme is a property of the table itself, not of the query
> > > > (or the join). Specifying a table property as part of a
> > > > query (or join) on the table seems to be an indication that
> > > > the KTable definition itself is lacking something.
> > > >
> > > > Perhaps a more comprehensive approach would be to add an
> > > > optional StreamPartitioner parameter to StreamBuilder#table.
> > > > Then, queries and joins (and anything else that partitioner-
> > > > sensitive now and in the future) could simply inherit the
> > > > partitioner via the topology.
> > > >
> > > > WDYT?
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote:
> > > > > Hi Victoria,
> > > > >
> > > > > Thanks for the KIP; this is a beneficial addition.
> > > > >
> > > > > I'm a +1 on the KIP and the changes made:
> > > > >
> > > > >    - Using a config object TableJoined
> > > > >    - Limiting the static methods to two
> > > > >
> > > > > I have an additional "wild" thought about rolling the `Function<V,
> KO>
> > > > > foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
> > > > > byte[]>>` into TableJoined to align and reduce the number of KTable
> > > > > interfaces.  But I don't have a strong opinion on this; I am
> curious to
> > > > see
> > > > > what others think about this possibility.
> > > > >
> > > > > -Bill
> > > > >
> > > > > On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Thanks for updating the KIP.
> > > > > >
> > > > > > One nit:
> > > > > >
> > > > > > > The existing methods which accept Named will be marked for
> > > > deprecation
> > > > > > in 4.0.
> > > > > >
> > > > > > We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0)
> and
> > > > (2)
> > > > > > a KIP could always slip into a future release.
> > > > > >
> > > > > >
> > > > > > About `TableJoined`: It seems you propose to add static methods
> for all
> > > > > > possible parameter combination. We usually try to avoid this to
> keep
> > > > the
> > > > > > number of methods low; if we add too many methods, it defeats the
> > > > > > purpose to use a "builder like" config object.
> > > > > >
> > > > > > To me, it seems sufficient to only have two static methods:
> > > > > >
> > > > > > > as(final String name);
> > > > > >
> > > > > > and
> > > > > >
> > > > > > > with(final StreamPartitioner<K, Void> partitioner,
> > > > > > >      final StreamPartitioner<KO, Void> otherPartitioner);
> > > > > >
> > > > > > The second one should allow to pass in `null` to only set one of
> both
> > > > > > partitioners.
> > > > > >
> > > > > > Curious to hear what other think.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 9/20/21 8:27 PM, Victoria Xia wrote:
> > > > > > > Hi Matthias,
> > > > > > >
> > > > > > > Thanks for having a look at the KIP! I've updated it with your
> > > > suggestion
> > > > > > > to introduce a new `TableJoined` object with partitioners of
> type
> > > > > > > `StreamPartitioner<K, Void>` and `StreamPartitioner<KO,
> Void>`, and
> > > > to
> > > > > > > deprecate the existing FK join methods which accept a `Named`
> object
> > > > > > > accordingly. I agree it makes sense to keep the number of join
> > > > interfaces
> > > > > > > smaller.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Victoria
> > > > > > >
> > > > > > > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <
> mj...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the KIP Victoria.
> > > > > > > >
> > > > > > > > As pointed out on the Jira ticket by you, using `<K,V>` and
> > > > `<KO,VO>` as
> > > > > > > > partitioner types does not really work, because we don't have
> > > > access to
> > > > > > > > the right value on the left side nor have we access to the
> left
> > > > value on
> > > > > > > > the right hand side. -- I like your idea to use `Void` as
> value
> > > > types to
> > > > > > > > make it clear to the users that partitioning must be done on
> the
> > > > key
> > > > > > only.
> > > > > > > >
> > > > > > > > For the proposed public API change, I would propose not to
> pass the
> > > > > > > > partitioners directly, but to introduce a config object
> (similar to
> > > > > > > > `Joined` for stream-table joins, and `StreamJoined` for
> > > > stream-stream
> > > > > > > > joins). This new object could also implement
> `NamedOperation` and
> > > > thus
> > > > > > > > replace `Named`. To this end, we would deprecate the existing
> > > > methods
> > > > > > > > using `Named` and replace them with the new methods. Net
> benefit
> > > > is,
> > > > > > > > that we don't get more overloads (after we removed the
> deprecated
> > > > ones).
> > > > > > > >
> > > > > > > > Not sure how we want to call the new object. Maybe
> `TableJoined` in
> > > > > > > > alignment to `StreamJoined`?
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > > On 9/15/21 3:36 PM, Victoria Xia wrote:
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I've opened a small KIP for adding Kafka Streams support
> for
> > > > foreign
> > > > > > key
> > > > > > > > > joins on tables with custom partitioners:
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> > > > > > > > >
> > > > > > > > > Feedback appreciated. Thanks!
> > > > > > > > >
> > > > > > > > > - Victoria
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > >
> > > >
> > >
> >
> >
>
>
>

Reply via email to