Thanks for the KIP, Victoria!

This is a great catch. As fas as my memory serves, we
totally missed this case in the design of FK joins.

tl;dr: I'm wondering if it might be better to instead
introduce a way to specify the partitioning scheme when we
create a KTable and then just use it when we join, rather
than to specify it in the join itself.

I was initially surprised to see the proposal to add these
partitioners in the join operation itself rather than
inheriting them from the tables on both sides, but I
reviewed the Streams DSL and see that we _cannot_ inherit it
because almost all the time, the input KTables' partitioning
is not known!

It seems like the only times we specify a partitioner on a
DSL object is:
1. when we produce to an output topic via KStream#to or
KStream#through.
2. when we repartition via KStream#repartition

These are both specifying the partitioner to use in output
operations (ie, we are telling Streams the partition *to
use*); there's currently only one situation in which we have
to _inform_ streams about the partitioning of a KTable or
KStream:
3. when we issue a key-based query via IQ, we need to know
the partitioner, so the IQ interface allows us to pass in a
custom partitioner with the query.

This is a bit weird. Taking a step back, the partitioning
scheme is a property of the table itself, not of the query
(or the join). Specifying a table property as part of a
query (or join) on the table seems to be an indication that
the KTable definition itself is lacking something.

Perhaps a more comprehensive approach would be to add an
optional StreamPartitioner parameter to StreamBuilder#table.
Then, queries and joins (and anything else that partitioner-
sensitive now and in the future) could simply inherit the
partitioner via the topology.

WDYT?

Thanks,
-John

On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote:
> Hi Victoria,
> 
> Thanks for the KIP; this is a beneficial addition.
> 
> I'm a +1 on the KIP and the changes made:
> 
>    - Using a config object TableJoined
>    - Limiting the static methods to two
> 
> I have an additional "wild" thought about rolling the `Function<V, KO>
> foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
> byte[]>>` into TableJoined to align and reduce the number of KTable
> interfaces.  But I don't have a strong opinion on this; I am curious to see
> what others think about this possibility.
> 
> -Bill
> 
> On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
> > Thanks for updating the KIP.
> > 
> > One nit:
> > 
> > > The existing methods which accept Named will be marked for deprecation
> > in 4.0.
> > 
> > We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) and (2)
> > a KIP could always slip into a future release.
> > 
> > 
> > About `TableJoined`: It seems you propose to add static methods for all
> > possible parameter combination. We usually try to avoid this to keep the
> > number of methods low; if we add too many methods, it defeats the
> > purpose to use a "builder like" config object.
> > 
> > To me, it seems sufficient to only have two static methods:
> > 
> > > as(final String name);
> > 
> > and
> > 
> > > with(final StreamPartitioner<K, Void> partitioner,
> > >      final StreamPartitioner<KO, Void> otherPartitioner);
> > 
> > The second one should allow to pass in `null` to only set one of both
> > partitioners.
> > 
> > Curious to hear what other think.
> > 
> > 
> > -Matthias
> > 
> > On 9/20/21 8:27 PM, Victoria Xia wrote:
> > > Hi Matthias,
> > > 
> > > Thanks for having a look at the KIP! I've updated it with your suggestion
> > > to introduce a new `TableJoined` object with partitioners of type
> > > `StreamPartitioner<K, Void>` and `StreamPartitioner<KO, Void>`, and to
> > > deprecate the existing FK join methods which accept a `Named` object
> > > accordingly. I agree it makes sense to keep the number of join interfaces
> > > smaller.
> > > 
> > > Thanks,
> > > Victoria
> > > 
> > > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > > 
> > > > Thanks for the KIP Victoria.
> > > > 
> > > > As pointed out on the Jira ticket by you, using `<K,V>` and `<KO,VO>` as
> > > > partitioner types does not really work, because we don't have access to
> > > > the right value on the left side nor have we access to the left value on
> > > > the right hand side. -- I like your idea to use `Void` as value types to
> > > > make it clear to the users that partitioning must be done on the key
> > only.
> > > > 
> > > > For the proposed public API change, I would propose not to pass the
> > > > partitioners directly, but to introduce a config object (similar to
> > > > `Joined` for stream-table joins, and `StreamJoined` for stream-stream
> > > > joins). This new object could also implement `NamedOperation` and thus
> > > > replace `Named`. To this end, we would deprecate the existing methods
> > > > using `Named` and replace them with the new methods. Net benefit is,
> > > > that we don't get more overloads (after we removed the deprecated ones).
> > > > 
> > > > Not sure how we want to call the new object. Maybe `TableJoined` in
> > > > alignment to `StreamJoined`?
> > > > 
> > > > 
> > > > -Matthias
> > > > 
> > > > On 9/15/21 3:36 PM, Victoria Xia wrote:
> > > > > Hi,
> > > > > 
> > > > > I've opened a small KIP for adding Kafka Streams support for foreign
> > key
> > > > > joins on tables with custom partitioners:
> > > > > 
> > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> > > > > 
> > > > > Feedback appreciated. Thanks!
> > > > > 
> > > > > - Victoria
> > > > > 
> > > > 
> > > 
> > 


Reply via email to