The current proposal to replace the `Named` overload with `TableJoined` overload does not increase the number of method we have. Thus, I don't share you concern, and I also think that even if an API redesign make be good in general, it not the right thing to piggyback on this KIP.

I also think that we should not put `inner/left/outer` as a _configuration_ for the join an any case, because to me, inner/left/outer is really a different (first class) operator.

Furthermore, if we make the key-extractor optional to distinguish between PK and FK join, we run into the issue that users could pass in a key-extractor into an outer join: however, FK joins are not supported for outer join and thus we lose the current API level (compile time) guard, but could only do a runtime check, which I personally think would be a step backwards.


-Matthias

On 9/24/21 8:21 AM, Bill Bejeck wrote:
Hi Victoria,

Earlier in the thread, I proposed rolling `Function<V, KO>
foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
byte[]>>` into TableJoined as well. From that perspective, I partially
agree with John's latest alternative, but I think we should still retain
the `leftJoin`, `join`, and `outerJoin` methods on the KTable interface.

I favor this approach as we would be able to get the KTable interface join
methods down to 3 (after removal in the next major release).  But I realize
this is a bit beyond the intention of this KIP, so I'd still support it as
is.

Thanks,
Bill



On Wed, Sep 22, 2021 at 6:54 PM John Roesler <vvcep...@apache.org> wrote:

Hi Victoria,

Here's the other conern I mentioned. I didn't bother
bringing it up before because it would be obviated by the
last proposal.

I'm wondering about the need to add four new overloads and a
new config object. We already have 20 KTable#join overloads,
out of a total of 47 KTable operations. In other words,
after this proposal, almost 50% of the KTable interface will
be devoted to slightly different variations of join().

This kind of proliferation has a number of downsides, which
I documented in my proposal to fix it here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar

We decided in the past to defer completely refactoring all
of the Streams DSL because we didn't forsee things becoming
much worse than they were at the time. Not to harp on this
particular KIP, but it does sort of seem like we're
approaching critical mass with 24 join overloads.

What do you think about seeking some kind of middle ground
by pulling more of the optional join arguments into the
proposed TableJoined interface? After all, starting to chip
away at this problem was the idea behind making these config
objects implement NamedOperation to begin with.

Specifically, this idea would be to add just one overload:
KTable#join(TableJoined<K, KO, VO, VR>)

And the TableJoined interface would have one static method
with the required `KTable<KO, VO> other` and `ValueJoiner<V,
VO, VR> joiner` arguments.

The rest of the arguments (isLeft, fkExtractor,
materialized, etc) would be settable using `withXYZ` builder
methods.

If that proposal isn't clear, I'd be happy to provide a code
sketch.

Thanks in advance for your consideration, and thanks again
for driving this,
-John


On Wed, 2021-09-22 at 17:34 -0500, John Roesler wrote:
Thanks, Guozhang!

I agree it would be bigger in scope to approach it that way.
We certainly would need to implement a way for the
partitioner to flow down the topology, like we do with
serdes and grace period information. It's not super complex,
but not trivial either.

I think we've always been mildly uncomfortable with
assumption (1), but we just kind of hope for the best for
performance reasons. But that's because the alternative we
considered in the past was to proactively repartition all
inputs. If the question is instead just getting a
partitioner and validating that the partitioning of the
table is correct, I think the performance question is much
different. However, we have also gone a really long time
with assumption (1) in place, and I don't believe it has
ever been an issue for anyone.

I appreciate your consideration. I still think it would be
nice to approach it more from a data provenance perspective,
but I don't feel strongly about it. I'm happy to leave it up
to Victoria's discretion.

I have one more comment about the KIP, but I'll send a
separate message for clarity.

Thanks again for considering it,
-John

On Wed, 2021-09-22 at 14:50 -0700, Guozhang Wang wrote:
Thanks Victoria for writing the KIP! I think this is a miss when we
designed KIP-213 and should be fixed in syntax. Regarding how large its
scope should be, here's my thoughts:

1) Today Streams does not take any indicator on how the input
stream/table
are partitioned, instead it simply assumes that the input stream/table
is
always partitioned by the key already. This is by-design that we let
the
users to make sure this is always the case; for internal repartitioning
like `groupBy`s, Streams does guarantee the the repartition topics are
partitioned by the new grouping keys, but nevertheless the assumption
still
holds: for all stream/table entities defined from a input topic
(whether
external or internal), that topic is partitioned by the stream/table
key,
and hence at the moment we do not require any partitioners to be
passed in
since we do not need it.

2) For all operators that need to write to a partition, today the
partitioners are either defined by the operator logic itself (think:
groupBys, where partitioner is hard-coded as by the grouping-key), or
user-customized (think: the #through/to APIs). We do not have any
scenarios
where we need to "inherit" partitioners from parent operators, until
FK-joins. Here, we need to make sure: a) the left table A's input
topic and
the internal response topic are co-partitioned; b) the right table B's
input topic and the internal subscription topic are co-partitioned. Of
course, if both left table A and right table B's input topic are
partitioned by the default partitioner, then both holds. But the
assumption
above only requires that the "topic is partitioned by the key", not
requiring "the topic is partitioned by the key, following exactly the
default partitioner mechanism" (e.g. in
https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises
when a
different partitioner which is not based on hashing of the bytes is
used,
which still guarantees "the input topic is partitioned by key").

So I feel that if we feel the assumption 1) above in Streams should
still
hold in the long run, it's not very meaningful to require the source
tables
to indicate their partitioners, but only require the FK join operators
itself to make sure the co-partition conditions a) and b) above holds.
Of
course the easiest way is to require users to pass-in the partitioners
for
those two internal topics, which they have to make sure are the same
as the
input partitioners. We can also have a bit more complicated approach to
have some "inheritance" rule for partitioners when they are given at
the
sink (we have similar rules for serde inheritance throughout the
topology),
but that only fixes for "#through / #repartition" cases, but not fixes
for
source "builder#table" cases -- i.e. we still need to require users to
indicate partitioners which we can hopefully inherit within the
topology.

I agree that for IQ, it would be beneficial if we have the partitioner
for
each table/stream entities so that IQ itself does not need the
partitioners
to be specified, but to make this fix work, we'd probably need both 1)
source table/stream partitioner specification and 2) inheritance rule
(otherwise we'd have to enforce users to specify the same for
#repartition
etc as well?). Given that rationale, I'm slightly leaning towards the
current proposal in the KIP doc, i.e. just fixing for this FK operator
only, with the easy approach that requires users themselves to set
partitioners accordingly.



Guozhang



On Wed, Sep 22, 2021 at 10:29 AM John Roesler <vvcep...@apache.org>
wrote:

Thanks for the KIP, Victoria!

This is a great catch. As fas as my memory serves, we
totally missed this case in the design of FK joins.

tl;dr: I'm wondering if it might be better to instead
introduce a way to specify the partitioning scheme when we
create a KTable and then just use it when we join, rather
than to specify it in the join itself.

I was initially surprised to see the proposal to add these
partitioners in the join operation itself rather than
inheriting them from the tables on both sides, but I
reviewed the Streams DSL and see that we _cannot_ inherit it
because almost all the time, the input KTables' partitioning
is not known!

It seems like the only times we specify a partitioner on a
DSL object is:
1. when we produce to an output topic via KStream#to or
KStream#through.
2. when we repartition via KStream#repartition

These are both specifying the partitioner to use in output
operations (ie, we are telling Streams the partition *to
use*); there's currently only one situation in which we have
to _inform_ streams about the partitioning of a KTable or
KStream:
3. when we issue a key-based query via IQ, we need to know
the partitioner, so the IQ interface allows us to pass in a
custom partitioner with the query.

This is a bit weird. Taking a step back, the partitioning
scheme is a property of the table itself, not of the query
(or the join). Specifying a table property as part of a
query (or join) on the table seems to be an indication that
the KTable definition itself is lacking something.

Perhaps a more comprehensive approach would be to add an
optional StreamPartitioner parameter to StreamBuilder#table.
Then, queries and joins (and anything else that partitioner-
sensitive now and in the future) could simply inherit the
partitioner via the topology.

WDYT?

Thanks,
-John

On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote:
Hi Victoria,

Thanks for the KIP; this is a beneficial addition.

I'm a +1 on the KIP and the changes made:

    - Using a config object TableJoined
    - Limiting the static methods to two

I have an additional "wild" thought about rolling the `Function<V,
KO>
foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
byte[]>>` into TableJoined to align and reduce the number of KTable
interfaces.  But I don't have a strong opinion on this; I am
curious to
see
what others think about this possibility.

-Bill

On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for updating the KIP.

One nit:

The existing methods which accept Named will be marked for
deprecation
in 4.0.

We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0)
and
(2)
a KIP could always slip into a future release.


About `TableJoined`: It seems you propose to add static methods
for all
possible parameter combination. We usually try to avoid this to
keep
the
number of methods low; if we add too many methods, it defeats the
purpose to use a "builder like" config object.

To me, it seems sufficient to only have two static methods:

as(final String name);

and

with(final StreamPartitioner<K, Void> partitioner,
      final StreamPartitioner<KO, Void> otherPartitioner);

The second one should allow to pass in `null` to only set one of
both
partitioners.

Curious to hear what other think.


-Matthias

On 9/20/21 8:27 PM, Victoria Xia wrote:
Hi Matthias,

Thanks for having a look at the KIP! I've updated it with your
suggestion
to introduce a new `TableJoined` object with partitioners of
type
`StreamPartitioner<K, Void>` and `StreamPartitioner<KO,
Void>`, and
to
deprecate the existing FK join methods which accept a `Named`
object
accordingly. I agree it makes sense to keep the number of join
interfaces
smaller.

Thanks,
Victoria

On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <
mj...@apache.org>
wrote:

Thanks for the KIP Victoria.

As pointed out on the Jira ticket by you, using `<K,V>` and
`<KO,VO>` as
partitioner types does not really work, because we don't have
access to
the right value on the left side nor have we access to the
left
value on
the right hand side. -- I like your idea to use `Void` as
value
types to
make it clear to the users that partitioning must be done on
the
key
only.

For the proposed public API change, I would propose not to
pass the
partitioners directly, but to introduce a config object
(similar to
`Joined` for stream-table joins, and `StreamJoined` for
stream-stream
joins). This new object could also implement
`NamedOperation` and
thus
replace `Named`. To this end, we would deprecate the existing
methods
using `Named` and replace them with the new methods. Net
benefit
is,
that we don't get more overloads (after we removed the
deprecated
ones).

Not sure how we want to call the new object. Maybe
`TableJoined` in
alignment to `StreamJoined`?


-Matthias

On 9/15/21 3:36 PM, Victoria Xia wrote:
Hi,

I've opened a small KIP for adding Kafka Streams support
for
foreign
key
joins on tables with custom partitioners:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins

Feedback appreciated. Thanks!

- Victoria














Reply via email to