Your point about the IQ problem is an interesting one. I missed the point that the "new key" would be a "superkey", and thus, it should always be possible to compute the original key from the superkey. (As a matter of fact, for windowed-table the windowed-key is also a superkey...)
I am not sure if we need to follow the "use the head idea" or if we need a "CompositeKey" interface? It seems we can just allow for any types and we can be agnostic to it? KStream<K, V> stream = ... KStream<SK, V> stream2 = stream.selectKey(/*set superkey*/) .markAsPartitioned() We only need a `Function<SK, K>` without any restrictions on the type, to map the "superkey" to the original "partition key"? Do you propose to provide the "revers mapper" via the `markAsPartitioned()` method (or config object), or via the IQ methods? Not sure which one is better? However, I am not sure if it would solve the join problem? At least not easily: if one has two KStream<Tuple,...> and one is properly partitioned by `Tuple` while the other one is "marked-as-partitoned", the join would just fail. -- Similar for a stream-table join. -- The only fix would be to do the re-partitioning anyway, effectively ignoring the "user hint", but it seems to defeat the purpose? Again, I would argue that it is ok to not handle this case, but leave it as the responsibility for the user to not mess it up. -Matthias On 8/9/21 2:32 PM, Ivan Ponomarev wrote: > Hi Matthias and Sophie! > > ==1. markAsPartitioned vs extending `selectKey(..)` etc. with a config.== > > I don't have a strong opinion here, both Sophie's and Matthias' points > look convincing for me. > > I think we should estimate the following: what is the probability that > we will ever need to extend `selectKey` etc. with a config for the > purposes other than `markAsPartitioned`? > > If we find this probability high, then it's just a refactoring to > deprecate overloads with `Named` and introduce overloads with dedicated > configs, and we should do it this way. > > If it's low or zero, maybe it's better not to mess with the existing > APIs and to introduce a single `markAsPartitioned()` method, which > itself can be easily deprecated if we find a better solution later! > > > ==2. The IQ problem== > >> it then has to be the case that > >> Partitioner.partition(key) == Partitioner.partition(map(key)) > > > Sophie, you got this wrong, and Matthias already explained why. > > The actual required property for the mapping function is: > > \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2)) > > or, by contraposition law, > > \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) ) > > > (look at the whiteboard photo that I attached to the KIP). > > There is a big class of such mappings: key -> Tuple(key, anyValue). This > is actually what we often do before aggregation, and this mapping does > not require repartition. > > But of course we can extract the original key from Tuple(key, anyValue), > and this can save IQ and joins! > > This is what I'm talking about when I talk about 'CompositeKey' idea. > > We can do the following: > > 1. implement a 'partitioner wrapper' that recognizes tuples > (CompositeKeys) and uses only the 'head' to calculate the partition, > > 2. implement > > selectCompositeKey(BiFunction<K, V> tailSelector) { > selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v)); > //MARK_AS_PARTITIONED call here, > //but this call is an implementation detail and we do not expose > //markAsPartitioned publicly! > } > > WDYT? (it's just a brainstorming idea) > > 09.08.2021 2:38, Matthias J. Sax пишет: >> Hi, >> >> I originally had a similar thought about `markAsPartitioned()` vs >> extending `selectKey()` et al. with a config. While I agree that it >> might be conceptually cleaner to use a config object, I did not propose >> it as the API impact (deprecating stuff and adding new stuff) is quite >> big... If we think it's an acceptable price to pay, I am ok with it >> though. >> >> I also do think, that `markAsPartitioned()` could actually be >> categorized as an operator... We don't expose it in the API as >> first-class citizen atm, but in fact we have two types of `KStream` -- a >> "PartitionedKStream" and a "NonPartitionedKStream". Thus, >> `markAsPartitioned()` can be seen as a "cast operator" that converts the >> one into the other. >> >> I also think that the raised concern about "forgetting to remove >> `markAsPartitioned()`" might not be very strong though. If you have >> different places in the code that link stuff together, a call to eg. >> `selectKey().markAsPartitioned()` must always to together. If you have >> some other place in the code that get a `KStream` passed an input, it >> would be "invalid" to blindly call `markAsPartitioned()` as you don't >> know anything about the upstream code. Of course, it requires some >> "coding discipline" to follow this pattern... Also, you can shoot >> themselves into the foot if they want with the config object pattern, >> too: if you get a `KStream` passed in, you can skip repartitioning via >> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still >> slightly prefer to add `markAsPartitioned()` as an operator. >> >> (Maybe we should have expose a `PartitionedKStream` as first class >> object to begin with... Hard to introduce now I guess...) >> >> >> The concern about IQ is interesting -- I did not realize this impact. >> Thanks for bringing it up. >> >>> a repartition would be a no-op, ie that the stream (and its >>> partitioning) >>> would be the same >>> whether or not a repartition is inserted. For this to be true, it >>> then has >>> to be the case that >>> >>> Partitioner.partition(key) == Partitioner.partition(map(key)) >> >> @Sophie: I don't think this statement is correct. A `markAsPartition()` >> only means, that the existing partitioning ensure that all messages of >> the same new key are still in the same partition. Ie, it cannot happen >> that two new keys (that are the same) are in a different partition. >> >> However, if you would physically repartitiong on the new key using the >> same hash-function as for the old key, there is no guarantee that the >> same partitions would be picked... And that is why IQ breaks downstream. >> >> Btw: using `markAsPartitioned()` could also be an issue for joins for >> the same reason... I want to call out, that the Jira tickets that did >> raise the concern about unnecessary repartitioning are about downstream >> aggregations though... >> >> Last but not least: we actually have a similar situation for >> windowed-aggregations: The result of a window aggregation is partitioned >> by the "plain key": if we write the result into a topic using the same >> partitioning function, we would write to different partitions... (I >> guess it was never an issue so far, as we don't have KIP-300 in place >> yet...) >> >> It's also not an issue for IQ, because we know the plain key, and thus >> can route to the right task. >> >> >> About a solution: I think it might be ok to say we don't need to solve >> this problem, but it's the users responsibility to take IQ into account. >> Ie, if they want to use IQ downstream, the need to repartition: for this >> case, repartitioning is _NOT_ unnecessary... The same argument seems to >> apply for the join case I mentioned above. -- Given that >> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to >> the user to use correctly (we should of course call it out in the docs!). >> >> >> >> -Matthias >> >> >> >> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote: >>> Before I dive in to the question of IQ and the approaches you >>> proposed, can >>> you just >>> elaborate on the problem itself? By definition, the `markAsPartitioned` >>> flag means that >>> a repartition would be a no-op, ie that the stream (and its >>> partitioning) >>> would be the same >>> whether or not a repartition is inserted. For this to be true, it >>> then has >>> to be the case that >>> >>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>> >>> The left-hand side of the above is precisely how we determine the >>> partition >>> number that >>> a key belongs to when using IQ. It shouldn't matter whether the user is >>> querying a key >>> in a store upstream of the key-changing operation or a mapped key >>> downstream of it >>> -- either way we just apply the given Partitioner. >>> >>> See StreamsMetadataState#getKeyQueryMetadataForKey >>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283> >>> >>> for where this happens >>> >>> >>> If we're concerned that users might try to abuse the new >>> `markAsPartitioned` feature, >>> or accidentally misuse it, then we could add a runtime check that >>> applies >>> the Partitioner >>> associated with that subtopology to the key being processed and the >>> mapped >>> key result >>> to assert that they do indeed match. Imo this is probably overkill, just >>> putting it out there. >>> >>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev >>> <iponoma...@mail.ru.invalid> >>> wrote: >>> >>>> Hi Sophie, >>>> >>>> thanks for your reply! So your proposal is: >>>> >>>> 1). For each key-changing operation, deprecate the existing overloads >>>> that accept a Named, and replace them with overloads that take an >>>> operator-specific config object. >>>> 2). Add `markAsPartitioned` flag to these configs. >>>> >>>> IMO, this looks much better than the original proposal, I like it very >>>> much and I think I will rewrite the KIP soon. I absolutely agree with >>>> your points. Repartition logic is not a part of the public contract, >>>> and >>>> it's much better to give it correct hints instead of telling explicitly >>>> what it should do. >>>> >>>> ... >>>> >>>> Since we're generating such bright ideas, maybe we should also >>>> brainstorm the interactive query problem? >>>> >>>> The problem is that interactive queries will not work properly when >>>> `markAsPartitioned` is used. Although original key and mapped key will >>>> be in the same partition, we will no longer be able to guess this >>>> partition given the mapped key only. >>>> >>>> The possible approaches are: >>>> >>>> 1) Give up and don't use interactive queries together with >>>> `markAsPartitioned`. This is what I suppose now. But can we do better? >>>> >>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will >>>> allow IQ to restore the original key in order to choose the correct >>>> partition. We can place this mapping in our new configuration >>>> object. Of >>>> course, there is no way for KStreams to verify in compile time/startup >>>> time that the this function is actually the reverse mapping that >>>> extract >>>> the old key from the new one. Users will forget to provide this >>>> function. Users will provide wrong functions. This all looks too >>>> fragile. >>>> >>>> 3) Maybe there can be a completely different approach. Let's >>>> introduce a >>>> new entity -- composite keys, consisting of "head" and "tail". The >>>> partition for the composite key is calculated based on its 'head' value >>>> only. If we provide a key mapping in form key -> CompositeKey(key, >>>> tail), then it's obvious that we do not need a repartition. When an >>>> interactive query needs to guess the partition for CompositeKey, it >>>> just >>>> extracts its head and calculates the correct partition. >>>> >>>> We can select CompositeKey before groupByKey() and aggregation >>>> operations, and this will not involve repartition. And IQ will work. >>>> >>>> Is it too daring idea, WDYT? My concern: will it cover all the cases >>>> when we want to choose a different key, but also avoid repartition? >>>> >>>> Regards, >>>> >>>> Ivan >>>> >>>> >>>> >>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет: >>>>> Hey Ivan >>>>> >>>>> I completely agree that adding it as a config to Grouped/Joined/etc >>>>> isn't >>>>> much better, I was just >>>>> listing it for completeness, and that I would prefer to make it a >>>>> configuration of the key-changing >>>>> operation itself -- that's what I meant by >>>>> >>>>> a better alternative might be to introduce this ... to the config >>>>> object >>>> of >>>>>> the operator that's actually >>>>> >>>>> doing the key changing operation >>>>> >>>>> >>>>> I personally believe this is the semantically "correct" way to >>>>> approach >>>>> this, since "preserves partitioning" >>>>> or "does not preserve partitioning" is a property of a key-changing >>>>> operation and not an operation on the >>>>> stream itself. Also, this way the user need only tell Streams which >>>>> operations do or do not preserve the >>>>> partitioning, and Streams can figure out where to insert a >>>>> repartition in >>>>> the topology as it does today. >>>>> >>>>> Otherwise, we're rendering this particularly useful feature of the >>>>> DSL -- >>>>> automatic repartitioning -- pretty >>>>> much useless, since the user now has to figure out whether a >>>>> repartition >>>> is >>>>> needed. On top of that, they >>>>> need to have some understanding of where and when this internal >>>>> automatic >>>>> repartitioning logic is going >>>>> to insert that repartition in order to cancel it in the appropriate >>>> place. >>>>> Which is pretty unfortunate, since >>>>> that logic is not part of the public contract: it can change at any >>>>> time, >>>>> for example as it did when we introduced >>>>> the repartition merging optimization. >>>>> >>>>> All that said, those are valid concerns regarding the expansion of the >>>>> API's surface area. Since none of >>>>> the key-changing operations currently have a config object like some >>>> other >>>>> operations (for example Grouped >>>>> or Consumed, etc), this would double the number of overloads. But >>>>> maybe >>>>> this is a good opportunity to fix >>>>> that problem, rather than keep digging ourselves into holes by >>>>> trying to >>>>> work around it. >>>>> >>>>> It looks like all of those key-changing operations have two >>>>> overloads at >>>>> the moment, one with no parameters >>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the >>>>> other with an additional Named >>>>> parameter, which is itself another kind of configuration. What if we >>>>> instead deprecate the existing overloads >>>>> that accept a Named, and replace them with overloads that take an >>>>> operator-specific config object like we do >>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named >>>>> and >>>>> this `markAsPartitioned` flag >>>>> be part of the general config object, which (a) does not expand the >>>>> API >>>>> surface area at all in this KIP, and (b) >>>>> also protects future KIPs from needing to have this same conversation >>>> over >>>>> and over, because we can now >>>>> stick any additional operator properties into that same config object. >>>>> >>>>> WDYT? By the way, the above idea (introducing a single config >>>>> object to >>>>> wrap all operator properties) was also >>>>> raised by John Roesler a while back. Let's hope he hasn't changed his >>>> mind >>>>> since then :) >>>>> >>>>> >>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev >>>>> <iponoma...@mail.ru.invalid >>>>> >>>>> wrote: >>>>> >>>>>> Hi Matthias, >>>>>> >>>>>> Concerning the naming: I like `markAsPartitioned`, because it >>>>>> describes >>>>>> what this operation is actually doing! >>>>>> >>>>>> Hi Sophie, >>>>>> >>>>>> I see the concern about poor code cohesion. We declare key mapping in >>>>>> one place of code, then later in another place we say >>>>>> "markAsPartitioned()". When we change the code six months later, we >>>>>> might forget to remove markAsPartitioned(), especially if it's >>>>>> placed in >>>>>> another method or class. But I don't understand why do you propose to >>>>>> include this config into Grouped/Joined/StreamJoined, because from >>>>>> this >>>>>> point of view it's not a better solution? >>>>>> >>>>>> The best approach regarding the cohesion might be to to add an extra >>>>>> 'preservePartition' flag to every key-changing operation, that is >>>>>> >>>>>> 1) selectKey >>>>>> 2) map >>>>>> 3) flatMap >>>>>> 4) transform >>>>>> 5) flatTransform >>>>>> >>>>>> in order to tell if the provided mapping require repartition or not. >>>>>> Indeed, this is a mapping operation property, not grouping one! >>>>>> BTW: the >>>>>> idea of adding extra parameter to `selectKey` was once coined by John >>>>>> Roesler. >>>>>> >>>>>> Arguments in favour for this approach: 1) better code cohesion >>>>>> from the >>>>>> point of view of the user, 2) 'smarter' code (the decision is taken >>>>>> depending on metadata provided for all the upstream mappings), 3) >>>>>> overall safer for the user. >>>>>> >>>>>> Arguments against: invasive KStreams API change, 5 more method >>>>>> overloads. Further on, when we add a new key-changing operation to >>>>>> KStream, we must add an overloaded version with 'preservePartition'. >>>>>> When we add a new overloaded version for existing operation, we >>>>>> actually >>>>>> might need to add two or more overloaded versions. This will soon >>>>>> become >>>>>> a mess. >>>>>> >>>>>> I thought that since `markAsPartitioned` is intended for advanced >>>>>> users, >>>>>> they will use it with care. When you're in a position where every >>>>>> serialization/deserialization round matters for the latency, you're >>>>>> extremely careful with the topology and you will not thoughtlessly >>>>>> add >>>>>> new key-changing operations without controlling how it's going to >>>>>> change >>>>>> the overall topology. >>>>>> >>>>>> By the way, if we later find a better solution, it's way more easy to >>>>>> deprecate a single `markAsPartitioned` operation than 5 method >>>> overloads. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет: >>>>>>> Do we really need a whole DSL operator for this? I think the >>>>>>> original >>>>>> name >>>>>>> for this >>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this >>>>>>> is not >>>> an >>>>>>> operation on the >>>>>>> stream itself but rather a command/request to whichever operator >>>>>>> would >>>>>> have >>>>>>> otherwise triggered this repartition. >>>>>>> >>>>>>> What about instead adding a new field to the >>>> Grouped/Joined/StreamJoined >>>>>>> config >>>>>>> objects that signals them to skip the repartitioning? >>>>>>> >>>>>>> The one downside to this specific proposal is that you would then >>>>>>> need >>>> to >>>>>>> specify >>>>>>> this for every stateful operation downstream of the key-changing >>>>>> operation. >>>>>>> So a >>>>>>> better alternative might be to introduce this `skipRepartition` >>>>>>> field, >>>> or >>>>>>> whatever we >>>>>>> want to call it, to the config object of the operator that's >>>>>>> actually >>>>>> doing >>>>>>> the key >>>>>>> changing operation which is apparently preserving the partitioning. >>>>>>> >>>>>>> Imo this would be more "safe" relative to the current proposal, >>>>>>> as the >>>>>> user >>>>>>> has to >>>>>>> explicitly consider whether every key changing operation is indeed >>>>>>> preserving the >>>>>>> partitioning. Otherwise you could code up a topology with several >>>>>>> key >>>>>>> changing >>>>>>> operations at the beginning which do require repartitioning. Then >>>>>>> you >>>> get >>>>>>> to the end >>>>>>> of the topology and insert one final key changing operation that >>>> doesn't, >>>>>>> assume >>>>>>> you can just cancel the repartition, and suddenly you're >>>>>>> wondering why >>>>>> your >>>>>>> results >>>>>>> are all screwed up >>>>>>> >>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org> >>>> wrote: >>>>>>> >>>>>>>> Thanks for the KIP Ivan! >>>>>>>> >>>>>>>> I think it's a good feature to give advanced users more control, >>>>>>>> and >>>>>>>> allow them to build more efficient application. >>>>>>>> >>>>>>>> Not sure if I like the proposed named though (the good old "naming >>>>>>>> things" discussion :)) >>>>>>>> >>>>>>>> Did you consider alternatives? What about >>>>>>>> >>>>>>>> - markAsPartitioned() >>>>>>>> - markAsKeyed() >>>>>>>> - skipRepartition() >>>>>>>> >>>>>>>> Not sure if there are other idea on a good name? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I'd like to start a discussion for KIP-759: >>>>>>>>> >>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling >>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct` >>>>>>>>> operator, which turned out to be a separate proposal. >>>>>>>>> >>>>>>>>> The proposal is quite trivial, however, we still might consider >>>>>>>>> alternatives (see 'Possible Alternatives' section). >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Ivan >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >