Thanks for the KIP, Victoria! This is a great catch. As fas as my memory serves, we totally missed this case in the design of FK joins.
tl;dr: I'm wondering if it might be better to instead introduce a way to specify the partitioning scheme when we create a KTable and then just use it when we join, rather than to specify it in the join itself. I was initially surprised to see the proposal to add these partitioners in the join operation itself rather than inheriting them from the tables on both sides, but I reviewed the Streams DSL and see that we _cannot_ inherit it because almost all the time, the input KTables' partitioning is not known! It seems like the only times we specify a partitioner on a DSL object is: 1. when we produce to an output topic via KStream#to or KStream#through. 2. when we repartition via KStream#repartition These are both specifying the partitioner to use in output operations (ie, we are telling Streams the partition *to use*); there's currently only one situation in which we have to _inform_ streams about the partitioning of a KTable or KStream: 3. when we issue a key-based query via IQ, we need to know the partitioner, so the IQ interface allows us to pass in a custom partitioner with the query. This is a bit weird. Taking a step back, the partitioning scheme is a property of the table itself, not of the query (or the join). Specifying a table property as part of a query (or join) on the table seems to be an indication that the KTable definition itself is lacking something. Perhaps a more comprehensive approach would be to add an optional StreamPartitioner parameter to StreamBuilder#table. Then, queries and joins (and anything else that partitioner- sensitive now and in the future) could simply inherit the partitioner via the topology. WDYT? Thanks, -John On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote: > Hi Victoria, > > Thanks for the KIP; this is a beneficial addition. > > I'm a +1 on the KIP and the changes made: > > - Using a config object TableJoined > - Limiting the static methods to two > > I have an additional "wild" thought about rolling the `Function<V, KO> > foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes, > byte[]>>` into TableJoined to align and reduce the number of KTable > interfaces. But I don't have a strong opinion on this; I am curious to see > what others think about this possibility. > > -Bill > > On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org> wrote: > > > Thanks for updating the KIP. > > > > One nit: > > > > > The existing methods which accept Named will be marked for deprecation > > in 4.0. > > > > We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) and (2) > > a KIP could always slip into a future release. > > > > > > About `TableJoined`: It seems you propose to add static methods for all > > possible parameter combination. We usually try to avoid this to keep the > > number of methods low; if we add too many methods, it defeats the > > purpose to use a "builder like" config object. > > > > To me, it seems sufficient to only have two static methods: > > > > > as(final String name); > > > > and > > > > > with(final StreamPartitioner<K, Void> partitioner, > > > final StreamPartitioner<KO, Void> otherPartitioner); > > > > The second one should allow to pass in `null` to only set one of both > > partitioners. > > > > Curious to hear what other think. > > > > > > -Matthias > > > > On 9/20/21 8:27 PM, Victoria Xia wrote: > > > Hi Matthias, > > > > > > Thanks for having a look at the KIP! I've updated it with your suggestion > > > to introduce a new `TableJoined` object with partitioners of type > > > `StreamPartitioner<K, Void>` and `StreamPartitioner<KO, Void>`, and to > > > deprecate the existing FK join methods which accept a `Named` object > > > accordingly. I agree it makes sense to keep the number of join interfaces > > > smaller. > > > > > > Thanks, > > > Victoria > > > > > > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > > > Thanks for the KIP Victoria. > > > > > > > > As pointed out on the Jira ticket by you, using `<K,V>` and `<KO,VO>` as > > > > partitioner types does not really work, because we don't have access to > > > > the right value on the left side nor have we access to the left value on > > > > the right hand side. -- I like your idea to use `Void` as value types to > > > > make it clear to the users that partitioning must be done on the key > > only. > > > > > > > > For the proposed public API change, I would propose not to pass the > > > > partitioners directly, but to introduce a config object (similar to > > > > `Joined` for stream-table joins, and `StreamJoined` for stream-stream > > > > joins). This new object could also implement `NamedOperation` and thus > > > > replace `Named`. To this end, we would deprecate the existing methods > > > > using `Named` and replace them with the new methods. Net benefit is, > > > > that we don't get more overloads (after we removed the deprecated ones). > > > > > > > > Not sure how we want to call the new object. Maybe `TableJoined` in > > > > alignment to `StreamJoined`? > > > > > > > > > > > > -Matthias > > > > > > > > On 9/15/21 3:36 PM, Victoria Xia wrote: > > > > > Hi, > > > > > > > > > > I've opened a small KIP for adding Kafka Streams support for foreign > > key > > > > > joins on tables with custom partitioners: > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins > > > > > > > > > > Feedback appreciated. Thanks! > > > > > > > > > > - Victoria > > > > > > > > > > > > > >