Thanks Victoria for writing the KIP! I think this is a miss when we
designed KIP-213 and should be fixed in syntax. Regarding how large its
scope should be, here's my thoughts:

1) Today Streams does not take any indicator on how the input stream/table
are partitioned, instead it simply assumes that the input stream/table is
always partitioned by the key already. This is by-design that we let the
users to make sure this is always the case; for internal repartitioning
like `groupBy`s, Streams does guarantee the the repartition topics are
partitioned by the new grouping keys, but nevertheless the assumption still
holds: for all stream/table entities defined from a input topic (whether
external or internal), that topic is partitioned by the stream/table key,
and hence at the moment we do not require any partitioners to be passed in
since we do not need it.

2) For all operators that need to write to a partition, today the
partitioners are either defined by the operator logic itself (think:
groupBys, where partitioner is hard-coded as by the grouping-key), or
user-customized (think: the #through/to APIs). We do not have any scenarios
where we need to "inherit" partitioners from parent operators, until
FK-joins. Here, we need to make sure: a) the left table A's input topic and
the internal response topic are co-partitioned; b) the right table B's
input topic and the internal subscription topic are co-partitioned. Of
course, if both left table A and right table B's input topic are
partitioned by the default partitioner, then both holds. But the assumption
above only requires that the "topic is partitioned by the key", not
requiring "the topic is partitioned by the key, following exactly the
default partitioner mechanism" (e.g. in
https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises when a
different partitioner which is not based on hashing of the bytes is used,
which still guarantees "the input topic is partitioned by key").

So I feel that if we feel the assumption 1) above in Streams should still
hold in the long run, it's not very meaningful to require the source tables
to indicate their partitioners, but only require the FK join operators
itself to make sure the co-partition conditions a) and b) above holds. Of
course the easiest way is to require users to pass-in the partitioners for
those two internal topics, which they have to make sure are the same as the
input partitioners. We can also have a bit more complicated approach to
have some "inheritance" rule for partitioners when they are given at the
sink (we have similar rules for serde inheritance throughout the topology),
but that only fixes for "#through / #repartition" cases, but not fixes for
source "builder#table" cases -- i.e. we still need to require users to
indicate partitioners which we can hopefully inherit within the topology.

I agree that for IQ, it would be beneficial if we have the partitioner for
each table/stream entities so that IQ itself does not need the partitioners
to be specified, but to make this fix work, we'd probably need both 1)
source table/stream partitioner specification and 2) inheritance rule
(otherwise we'd have to enforce users to specify the same for #repartition
etc as well?). Given that rationale, I'm slightly leaning towards the
current proposal in the KIP doc, i.e. just fixing for this FK operator
only, with the easy approach that requires users themselves to set
partitioners accordingly.



Guozhang



On Wed, Sep 22, 2021 at 10:29 AM John Roesler <vvcep...@apache.org> wrote:

> Thanks for the KIP, Victoria!
>
> This is a great catch. As fas as my memory serves, we
> totally missed this case in the design of FK joins.
>
> tl;dr: I'm wondering if it might be better to instead
> introduce a way to specify the partitioning scheme when we
> create a KTable and then just use it when we join, rather
> than to specify it in the join itself.
>
> I was initially surprised to see the proposal to add these
> partitioners in the join operation itself rather than
> inheriting them from the tables on both sides, but I
> reviewed the Streams DSL and see that we _cannot_ inherit it
> because almost all the time, the input KTables' partitioning
> is not known!
>
> It seems like the only times we specify a partitioner on a
> DSL object is:
> 1. when we produce to an output topic via KStream#to or
> KStream#through.
> 2. when we repartition via KStream#repartition
>
> These are both specifying the partitioner to use in output
> operations (ie, we are telling Streams the partition *to
> use*); there's currently only one situation in which we have
> to _inform_ streams about the partitioning of a KTable or
> KStream:
> 3. when we issue a key-based query via IQ, we need to know
> the partitioner, so the IQ interface allows us to pass in a
> custom partitioner with the query.
>
> This is a bit weird. Taking a step back, the partitioning
> scheme is a property of the table itself, not of the query
> (or the join). Specifying a table property as part of a
> query (or join) on the table seems to be an indication that
> the KTable definition itself is lacking something.
>
> Perhaps a more comprehensive approach would be to add an
> optional StreamPartitioner parameter to StreamBuilder#table.
> Then, queries and joins (and anything else that partitioner-
> sensitive now and in the future) could simply inherit the
> partitioner via the topology.
>
> WDYT?
>
> Thanks,
> -John
>
> On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote:
> > Hi Victoria,
> >
> > Thanks for the KIP; this is a beneficial addition.
> >
> > I'm a +1 on the KIP and the changes made:
> >
> >    - Using a config object TableJoined
> >    - Limiting the static methods to two
> >
> > I have an additional "wild" thought about rolling the `Function<V, KO>
> > foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
> > byte[]>>` into TableJoined to align and reduce the number of KTable
> > interfaces.  But I don't have a strong opinion on this; I am curious to
> see
> > what others think about this possibility.
> >
> > -Bill
> >
> > On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > Thanks for updating the KIP.
> > >
> > > One nit:
> > >
> > > > The existing methods which accept Named will be marked for
> deprecation
> > > in 4.0.
> > >
> > > We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) and
> (2)
> > > a KIP could always slip into a future release.
> > >
> > >
> > > About `TableJoined`: It seems you propose to add static methods for all
> > > possible parameter combination. We usually try to avoid this to keep
> the
> > > number of methods low; if we add too many methods, it defeats the
> > > purpose to use a "builder like" config object.
> > >
> > > To me, it seems sufficient to only have two static methods:
> > >
> > > > as(final String name);
> > >
> > > and
> > >
> > > > with(final StreamPartitioner<K, Void> partitioner,
> > > >      final StreamPartitioner<KO, Void> otherPartitioner);
> > >
> > > The second one should allow to pass in `null` to only set one of both
> > > partitioners.
> > >
> > > Curious to hear what other think.
> > >
> > >
> > > -Matthias
> > >
> > > On 9/20/21 8:27 PM, Victoria Xia wrote:
> > > > Hi Matthias,
> > > >
> > > > Thanks for having a look at the KIP! I've updated it with your
> suggestion
> > > > to introduce a new `TableJoined` object with partitioners of type
> > > > `StreamPartitioner<K, Void>` and `StreamPartitioner<KO, Void>`, and
> to
> > > > deprecate the existing FK join methods which accept a `Named` object
> > > > accordingly. I agree it makes sense to keep the number of join
> interfaces
> > > > smaller.
> > > >
> > > > Thanks,
> > > > Victoria
> > > >
> > > > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <mj...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks for the KIP Victoria.
> > > > >
> > > > > As pointed out on the Jira ticket by you, using `<K,V>` and
> `<KO,VO>` as
> > > > > partitioner types does not really work, because we don't have
> access to
> > > > > the right value on the left side nor have we access to the left
> value on
> > > > > the right hand side. -- I like your idea to use `Void` as value
> types to
> > > > > make it clear to the users that partitioning must be done on the
> key
> > > only.
> > > > >
> > > > > For the proposed public API change, I would propose not to pass the
> > > > > partitioners directly, but to introduce a config object (similar to
> > > > > `Joined` for stream-table joins, and `StreamJoined` for
> stream-stream
> > > > > joins). This new object could also implement `NamedOperation` and
> thus
> > > > > replace `Named`. To this end, we would deprecate the existing
> methods
> > > > > using `Named` and replace them with the new methods. Net benefit
> is,
> > > > > that we don't get more overloads (after we removed the deprecated
> ones).
> > > > >
> > > > > Not sure how we want to call the new object. Maybe `TableJoined` in
> > > > > alignment to `StreamJoined`?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 9/15/21 3:36 PM, Victoria Xia wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I've opened a small KIP for adding Kafka Streams support for
> foreign
> > > key
> > > > > > joins on tables with custom partitioners:
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> > > > > >
> > > > > > Feedback appreciated. Thanks!
> > > > > >
> > > > > > - Victoria
> > > > > >
> > > > >
> > > >
> > >
>
>
>

-- 
-- Guozhang

Reply via email to