Hi Victoria, Earlier in the thread, I proposed rolling `Function<V, KO> foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes, byte[]>>` into TableJoined as well. From that perspective, I partially agree with John's latest alternative, but I think we should still retain the `leftJoin`, `join`, and `outerJoin` methods on the KTable interface.
I favor this approach as we would be able to get the KTable interface join methods down to 3 (after removal in the next major release). But I realize this is a bit beyond the intention of this KIP, so I'd still support it as is. Thanks, Bill On Wed, Sep 22, 2021 at 6:54 PM John Roesler <vvcep...@apache.org> wrote: > Hi Victoria, > > Here's the other conern I mentioned. I didn't bother > bringing it up before because it would be obviated by the > last proposal. > > I'm wondering about the need to add four new overloads and a > new config object. We already have 20 KTable#join overloads, > out of a total of 47 KTable operations. In other words, > after this proposal, almost 50% of the KTable interface will > be devoted to slightly different variations of join(). > > This kind of proliferation has a number of downsides, which > I documented in my proposal to fix it here: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > > We decided in the past to defer completely refactoring all > of the Streams DSL because we didn't forsee things becoming > much worse than they were at the time. Not to harp on this > particular KIP, but it does sort of seem like we're > approaching critical mass with 24 join overloads. > > What do you think about seeking some kind of middle ground > by pulling more of the optional join arguments into the > proposed TableJoined interface? After all, starting to chip > away at this problem was the idea behind making these config > objects implement NamedOperation to begin with. > > Specifically, this idea would be to add just one overload: > KTable#join(TableJoined<K, KO, VO, VR>) > > And the TableJoined interface would have one static method > with the required `KTable<KO, VO> other` and `ValueJoiner<V, > VO, VR> joiner` arguments. > > The rest of the arguments (isLeft, fkExtractor, > materialized, etc) would be settable using `withXYZ` builder > methods. > > If that proposal isn't clear, I'd be happy to provide a code > sketch. > > Thanks in advance for your consideration, and thanks again > for driving this, > -John > > > On Wed, 2021-09-22 at 17:34 -0500, John Roesler wrote: > > Thanks, Guozhang! > > > > I agree it would be bigger in scope to approach it that way. > > We certainly would need to implement a way for the > > partitioner to flow down the topology, like we do with > > serdes and grace period information. It's not super complex, > > but not trivial either. > > > > I think we've always been mildly uncomfortable with > > assumption (1), but we just kind of hope for the best for > > performance reasons. But that's because the alternative we > > considered in the past was to proactively repartition all > > inputs. If the question is instead just getting a > > partitioner and validating that the partitioning of the > > table is correct, I think the performance question is much > > different. However, we have also gone a really long time > > with assumption (1) in place, and I don't believe it has > > ever been an issue for anyone. > > > > I appreciate your consideration. I still think it would be > > nice to approach it more from a data provenance perspective, > > but I don't feel strongly about it. I'm happy to leave it up > > to Victoria's discretion. > > > > I have one more comment about the KIP, but I'll send a > > separate message for clarity. > > > > Thanks again for considering it, > > -John > > > > On Wed, 2021-09-22 at 14:50 -0700, Guozhang Wang wrote: > > > Thanks Victoria for writing the KIP! I think this is a miss when we > > > designed KIP-213 and should be fixed in syntax. Regarding how large its > > > scope should be, here's my thoughts: > > > > > > 1) Today Streams does not take any indicator on how the input > stream/table > > > are partitioned, instead it simply assumes that the input stream/table > is > > > always partitioned by the key already. This is by-design that we let > the > > > users to make sure this is always the case; for internal repartitioning > > > like `groupBy`s, Streams does guarantee the the repartition topics are > > > partitioned by the new grouping keys, but nevertheless the assumption > still > > > holds: for all stream/table entities defined from a input topic > (whether > > > external or internal), that topic is partitioned by the stream/table > key, > > > and hence at the moment we do not require any partitioners to be > passed in > > > since we do not need it. > > > > > > 2) For all operators that need to write to a partition, today the > > > partitioners are either defined by the operator logic itself (think: > > > groupBys, where partitioner is hard-coded as by the grouping-key), or > > > user-customized (think: the #through/to APIs). We do not have any > scenarios > > > where we need to "inherit" partitioners from parent operators, until > > > FK-joins. Here, we need to make sure: a) the left table A's input > topic and > > > the internal response topic are co-partitioned; b) the right table B's > > > input topic and the internal subscription topic are co-partitioned. Of > > > course, if both left table A and right table B's input topic are > > > partitioned by the default partitioner, then both holds. But the > assumption > > > above only requires that the "topic is partitioned by the key", not > > > requiring "the topic is partitioned by the key, following exactly the > > > default partitioner mechanism" (e.g. in > > > https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises > when a > > > different partitioner which is not based on hashing of the bytes is > used, > > > which still guarantees "the input topic is partitioned by key"). > > > > > > So I feel that if we feel the assumption 1) above in Streams should > still > > > hold in the long run, it's not very meaningful to require the source > tables > > > to indicate their partitioners, but only require the FK join operators > > > itself to make sure the co-partition conditions a) and b) above holds. > Of > > > course the easiest way is to require users to pass-in the partitioners > for > > > those two internal topics, which they have to make sure are the same > as the > > > input partitioners. We can also have a bit more complicated approach to > > > have some "inheritance" rule for partitioners when they are given at > the > > > sink (we have similar rules for serde inheritance throughout the > topology), > > > but that only fixes for "#through / #repartition" cases, but not fixes > for > > > source "builder#table" cases -- i.e. we still need to require users to > > > indicate partitioners which we can hopefully inherit within the > topology. > > > > > > I agree that for IQ, it would be beneficial if we have the partitioner > for > > > each table/stream entities so that IQ itself does not need the > partitioners > > > to be specified, but to make this fix work, we'd probably need both 1) > > > source table/stream partitioner specification and 2) inheritance rule > > > (otherwise we'd have to enforce users to specify the same for > #repartition > > > etc as well?). Given that rationale, I'm slightly leaning towards the > > > current proposal in the KIP doc, i.e. just fixing for this FK operator > > > only, with the easy approach that requires users themselves to set > > > partitioners accordingly. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Sep 22, 2021 at 10:29 AM John Roesler <vvcep...@apache.org> > wrote: > > > > > > > Thanks for the KIP, Victoria! > > > > > > > > This is a great catch. As fas as my memory serves, we > > > > totally missed this case in the design of FK joins. > > > > > > > > tl;dr: I'm wondering if it might be better to instead > > > > introduce a way to specify the partitioning scheme when we > > > > create a KTable and then just use it when we join, rather > > > > than to specify it in the join itself. > > > > > > > > I was initially surprised to see the proposal to add these > > > > partitioners in the join operation itself rather than > > > > inheriting them from the tables on both sides, but I > > > > reviewed the Streams DSL and see that we _cannot_ inherit it > > > > because almost all the time, the input KTables' partitioning > > > > is not known! > > > > > > > > It seems like the only times we specify a partitioner on a > > > > DSL object is: > > > > 1. when we produce to an output topic via KStream#to or > > > > KStream#through. > > > > 2. when we repartition via KStream#repartition > > > > > > > > These are both specifying the partitioner to use in output > > > > operations (ie, we are telling Streams the partition *to > > > > use*); there's currently only one situation in which we have > > > > to _inform_ streams about the partitioning of a KTable or > > > > KStream: > > > > 3. when we issue a key-based query via IQ, we need to know > > > > the partitioner, so the IQ interface allows us to pass in a > > > > custom partitioner with the query. > > > > > > > > This is a bit weird. Taking a step back, the partitioning > > > > scheme is a property of the table itself, not of the query > > > > (or the join). Specifying a table property as part of a > > > > query (or join) on the table seems to be an indication that > > > > the KTable definition itself is lacking something. > > > > > > > > Perhaps a more comprehensive approach would be to add an > > > > optional StreamPartitioner parameter to StreamBuilder#table. > > > > Then, queries and joins (and anything else that partitioner- > > > > sensitive now and in the future) could simply inherit the > > > > partitioner via the topology. > > > > > > > > WDYT? > > > > > > > > Thanks, > > > > -John > > > > > > > > On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote: > > > > > Hi Victoria, > > > > > > > > > > Thanks for the KIP; this is a beneficial addition. > > > > > > > > > > I'm a +1 on the KIP and the changes made: > > > > > > > > > > - Using a config object TableJoined > > > > > - Limiting the static methods to two > > > > > > > > > > I have an additional "wild" thought about rolling the `Function<V, > KO> > > > > > foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes, > > > > > byte[]>>` into TableJoined to align and reduce the number of KTable > > > > > interfaces. But I don't have a strong opinion on this; I am > curious to > > > > see > > > > > what others think about this possibility. > > > > > > > > > > -Bill > > > > > > > > > > On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org> > > > > wrote: > > > > > > > > > > > Thanks for updating the KIP. > > > > > > > > > > > > One nit: > > > > > > > > > > > > > The existing methods which accept Named will be marked for > > > > deprecation > > > > > > in 4.0. > > > > > > > > > > > > We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) > and > > > > (2) > > > > > > a KIP could always slip into a future release. > > > > > > > > > > > > > > > > > > About `TableJoined`: It seems you propose to add static methods > for all > > > > > > possible parameter combination. We usually try to avoid this to > keep > > > > the > > > > > > number of methods low; if we add too many methods, it defeats the > > > > > > purpose to use a "builder like" config object. > > > > > > > > > > > > To me, it seems sufficient to only have two static methods: > > > > > > > > > > > > > as(final String name); > > > > > > > > > > > > and > > > > > > > > > > > > > with(final StreamPartitioner<K, Void> partitioner, > > > > > > > final StreamPartitioner<KO, Void> otherPartitioner); > > > > > > > > > > > > The second one should allow to pass in `null` to only set one of > both > > > > > > partitioners. > > > > > > > > > > > > Curious to hear what other think. > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 9/20/21 8:27 PM, Victoria Xia wrote: > > > > > > > Hi Matthias, > > > > > > > > > > > > > > Thanks for having a look at the KIP! I've updated it with your > > > > suggestion > > > > > > > to introduce a new `TableJoined` object with partitioners of > type > > > > > > > `StreamPartitioner<K, Void>` and `StreamPartitioner<KO, > Void>`, and > > > > to > > > > > > > deprecate the existing FK join methods which accept a `Named` > object > > > > > > > accordingly. I agree it makes sense to keep the number of join > > > > interfaces > > > > > > > smaller. > > > > > > > > > > > > > > Thanks, > > > > > > > Victoria > > > > > > > > > > > > > > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax < > mj...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > Thanks for the KIP Victoria. > > > > > > > > > > > > > > > > As pointed out on the Jira ticket by you, using `<K,V>` and > > > > `<KO,VO>` as > > > > > > > > partitioner types does not really work, because we don't have > > > > access to > > > > > > > > the right value on the left side nor have we access to the > left > > > > value on > > > > > > > > the right hand side. -- I like your idea to use `Void` as > value > > > > types to > > > > > > > > make it clear to the users that partitioning must be done on > the > > > > key > > > > > > only. > > > > > > > > > > > > > > > > For the proposed public API change, I would propose not to > pass the > > > > > > > > partitioners directly, but to introduce a config object > (similar to > > > > > > > > `Joined` for stream-table joins, and `StreamJoined` for > > > > stream-stream > > > > > > > > joins). This new object could also implement > `NamedOperation` and > > > > thus > > > > > > > > replace `Named`. To this end, we would deprecate the existing > > > > methods > > > > > > > > using `Named` and replace them with the new methods. Net > benefit > > > > is, > > > > > > > > that we don't get more overloads (after we removed the > deprecated > > > > ones). > > > > > > > > > > > > > > > > Not sure how we want to call the new object. Maybe > `TableJoined` in > > > > > > > > alignment to `StreamJoined`? > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 9/15/21 3:36 PM, Victoria Xia wrote: > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > I've opened a small KIP for adding Kafka Streams support > for > > > > foreign > > > > > > key > > > > > > > > > joins on tables with custom partitioners: > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins > > > > > > > > > > > > > > > > > > Feedback appreciated. Thanks! > > > > > > > > > > > > > > > > > > - Victoria > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >