Hi, Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes dormant, I'd be interested in picking it up.
Levani > On 23. Apr 2022, at 02:43, Matthias J. Sax <mj...@apache.org> wrote: > > Ivan, > > are you still interested in this KIP? I think it would be a good addition. > > > -Matthias > > On 8/16/21 5:30 PM, Matthias J. Sax wrote: >> Your point about the IQ problem is an interesting one. I missed the >> point that the "new key" would be a "superkey", and thus, it should >> always be possible to compute the original key from the superkey. (As a >> matter of fact, for windowed-table the windowed-key is also a superkey...) >> I am not sure if we need to follow the "use the head idea" or if we need >> a "CompositeKey" interface? It seems we can just allow for any types and >> we can be agnostic to it? >> KStream<K, V> stream = ... >> KStream<SK, V> stream2 = >> stream.selectKey(/*set superkey*/) >> .markAsPartitioned() >> We only need a `Function<SK, K>` without any restrictions on the type, >> to map the "superkey" to the original "partition key"? >> Do you propose to provide the "revers mapper" via the >> `markAsPartitioned()` method (or config object), or via the IQ methods? >> Not sure which one is better? >> However, I am not sure if it would solve the join problem? At least not >> easily: if one has two KStream<Tuple,...> and one is properly >> partitioned by `Tuple` while the other one is "marked-as-partitoned", >> the join would just fail. -- Similar for a stream-table join. -- The >> only fix would be to do the re-partitioning anyway, effectively ignoring >> the "user hint", but it seems to defeat the purpose? Again, I would >> argue that it is ok to not handle this case, but leave it as the >> responsibility for the user to not mess it up. >> -Matthias >> On 8/9/21 2:32 PM, Ivan Ponomarev wrote: >>> Hi Matthias and Sophie! >>> >>> ==1. markAsPartitioned vs extending `selectKey(..)` etc. with a config.== >>> >>> I don't have a strong opinion here, both Sophie's and Matthias' points >>> look convincing for me. >>> >>> I think we should estimate the following: what is the probability that >>> we will ever need to extend `selectKey` etc. with a config for the >>> purposes other than `markAsPartitioned`? >>> >>> If we find this probability high, then it's just a refactoring to >>> deprecate overloads with `Named` and introduce overloads with dedicated >>> configs, and we should do it this way. >>> >>> If it's low or zero, maybe it's better not to mess with the existing >>> APIs and to introduce a single `markAsPartitioned()` method, which >>> itself can be easily deprecated if we find a better solution later! >>> >>> >>> ==2. The IQ problem== >>> >>>> it then has to be the case that >>> >>>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>> >>> >>> Sophie, you got this wrong, and Matthias already explained why. >>> >>> The actual required property for the mapping function is: >>> >>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2)) >>> >>> or, by contraposition law, >>> >>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) ) >>> >>> >>> (look at the whiteboard photo that I attached to the KIP). >>> >>> There is a big class of such mappings: key -> Tuple(key, anyValue). This >>> is actually what we often do before aggregation, and this mapping does >>> not require repartition. >>> >>> But of course we can extract the original key from Tuple(key, anyValue), >>> and this can save IQ and joins! >>> >>> This is what I'm talking about when I talk about 'CompositeKey' idea. >>> >>> We can do the following: >>> >>> 1. implement a 'partitioner wrapper' that recognizes tuples >>> (CompositeKeys) and uses only the 'head' to calculate the partition, >>> >>> 2. implement >>> >>> selectCompositeKey(BiFunction<K, V> tailSelector) { >>> selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v)); >>> //MARK_AS_PARTITIONED call here, >>> //but this call is an implementation detail and we do not expose >>> //markAsPartitioned publicly! >>> } >>> >>> WDYT? (it's just a brainstorming idea) >>> >>> 09.08.2021 2:38, Matthias J. Sax пишет: >>>> Hi, >>>> >>>> I originally had a similar thought about `markAsPartitioned()` vs >>>> extending `selectKey()` et al. with a config. While I agree that it >>>> might be conceptually cleaner to use a config object, I did not propose >>>> it as the API impact (deprecating stuff and adding new stuff) is quite >>>> big... If we think it's an acceptable price to pay, I am ok with it >>>> though. >>>> >>>> I also do think, that `markAsPartitioned()` could actually be >>>> categorized as an operator... We don't expose it in the API as >>>> first-class citizen atm, but in fact we have two types of `KStream` -- a >>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus, >>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the >>>> one into the other. >>>> >>>> I also think that the raised concern about "forgetting to remove >>>> `markAsPartitioned()`" might not be very strong though. If you have >>>> different places in the code that link stuff together, a call to eg. >>>> `selectKey().markAsPartitioned()` must always to together. If you have >>>> some other place in the code that get a `KStream` passed an input, it >>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't >>>> know anything about the upstream code. Of course, it requires some >>>> "coding discipline" to follow this pattern... Also, you can shoot >>>> themselves into the foot if they want with the config object pattern, >>>> too: if you get a `KStream` passed in, you can skip repartitioning via >>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still >>>> slightly prefer to add `markAsPartitioned()` as an operator. >>>> >>>> (Maybe we should have expose a `PartitionedKStream` as first class >>>> object to begin with... Hard to introduce now I guess...) >>>> >>>> >>>> The concern about IQ is interesting -- I did not realize this impact. >>>> Thanks for bringing it up. >>>> >>>>> a repartition would be a no-op, ie that the stream (and its >>>>> partitioning) >>>>> would be the same >>>>> whether or not a repartition is inserted. For this to be true, it >>>>> then has >>>>> to be the case that >>>>> >>>>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>>> >>>> @Sophie: I don't think this statement is correct. A `markAsPartition()` >>>> only means, that the existing partitioning ensure that all messages of >>>> the same new key are still in the same partition. Ie, it cannot happen >>>> that two new keys (that are the same) are in a different partition. >>>> >>>> However, if you would physically repartitiong on the new key using the >>>> same hash-function as for the old key, there is no guarantee that the >>>> same partitions would be picked... And that is why IQ breaks downstream. >>>> >>>> Btw: using `markAsPartitioned()` could also be an issue for joins for >>>> the same reason... I want to call out, that the Jira tickets that did >>>> raise the concern about unnecessary repartitioning are about downstream >>>> aggregations though... >>>> >>>> Last but not least: we actually have a similar situation for >>>> windowed-aggregations: The result of a window aggregation is partitioned >>>> by the "plain key": if we write the result into a topic using the same >>>> partitioning function, we would write to different partitions... (I >>>> guess it was never an issue so far, as we don't have KIP-300 in place >>>> yet...) >>>> >>>> It's also not an issue for IQ, because we know the plain key, and thus >>>> can route to the right task. >>>> >>>> >>>> About a solution: I think it might be ok to say we don't need to solve >>>> this problem, but it's the users responsibility to take IQ into account. >>>> Ie, if they want to use IQ downstream, the need to repartition: for this >>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to >>>> apply for the join case I mentioned above. -- Given that >>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to >>>> the user to use correctly (we should of course call it out in the docs!). >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote: >>>>> Before I dive in to the question of IQ and the approaches you >>>>> proposed, can >>>>> you just >>>>> elaborate on the problem itself? By definition, the `markAsPartitioned` >>>>> flag means that >>>>> a repartition would be a no-op, ie that the stream (and its >>>>> partitioning) >>>>> would be the same >>>>> whether or not a repartition is inserted. For this to be true, it >>>>> then has >>>>> to be the case that >>>>> >>>>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>>>> >>>>> The left-hand side of the above is precisely how we determine the >>>>> partition >>>>> number that >>>>> a key belongs to when using IQ. It shouldn't matter whether the user is >>>>> querying a key >>>>> in a store upstream of the key-changing operation or a mapped key >>>>> downstream of it >>>>> -- either way we just apply the given Partitioner. >>>>> >>>>> See StreamsMetadataState#getKeyQueryMetadataForKey >>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283> >>>>> >>>>> for where this happens >>>>> >>>>> >>>>> If we're concerned that users might try to abuse the new >>>>> `markAsPartitioned` feature, >>>>> or accidentally misuse it, then we could add a runtime check that >>>>> applies >>>>> the Partitioner >>>>> associated with that subtopology to the key being processed and the >>>>> mapped >>>>> key result >>>>> to assert that they do indeed match. Imo this is probably overkill, just >>>>> putting it out there. >>>>> >>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev >>>>> <iponoma...@mail.ru.invalid> >>>>> wrote: >>>>> >>>>>> Hi Sophie, >>>>>> >>>>>> thanks for your reply! So your proposal is: >>>>>> >>>>>> 1). For each key-changing operation, deprecate the existing overloads >>>>>> that accept a Named, and replace them with overloads that take an >>>>>> operator-specific config object. >>>>>> 2). Add `markAsPartitioned` flag to these configs. >>>>>> >>>>>> IMO, this looks much better than the original proposal, I like it very >>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with >>>>>> your points. Repartition logic is not a part of the public contract, >>>>>> and >>>>>> it's much better to give it correct hints instead of telling explicitly >>>>>> what it should do. >>>>>> >>>>>> ... >>>>>> >>>>>> Since we're generating such bright ideas, maybe we should also >>>>>> brainstorm the interactive query problem? >>>>>> >>>>>> The problem is that interactive queries will not work properly when >>>>>> `markAsPartitioned` is used. Although original key and mapped key will >>>>>> be in the same partition, we will no longer be able to guess this >>>>>> partition given the mapped key only. >>>>>> >>>>>> The possible approaches are: >>>>>> >>>>>> 1) Give up and don't use interactive queries together with >>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better? >>>>>> >>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will >>>>>> allow IQ to restore the original key in order to choose the correct >>>>>> partition. We can place this mapping in our new configuration >>>>>> object. Of >>>>>> course, there is no way for KStreams to verify in compile time/startup >>>>>> time that the this function is actually the reverse mapping that >>>>>> extract >>>>>> the old key from the new one. Users will forget to provide this >>>>>> function. Users will provide wrong functions. This all looks too >>>>>> fragile. >>>>>> >>>>>> 3) Maybe there can be a completely different approach. Let's >>>>>> introduce a >>>>>> new entity -- composite keys, consisting of "head" and "tail". The >>>>>> partition for the composite key is calculated based on its 'head' value >>>>>> only. If we provide a key mapping in form key -> CompositeKey(key, >>>>>> tail), then it's obvious that we do not need a repartition. When an >>>>>> interactive query needs to guess the partition for CompositeKey, it >>>>>> just >>>>>> extracts its head and calculates the correct partition. >>>>>> >>>>>> We can select CompositeKey before groupByKey() and aggregation >>>>>> operations, and this will not involve repartition. And IQ will work. >>>>>> >>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases >>>>>> when we want to choose a different key, but also avoid repartition? >>>>>> >>>>>> Regards, >>>>>> >>>>>> Ivan >>>>>> >>>>>> >>>>>> >>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет: >>>>>>> Hey Ivan >>>>>>> >>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc >>>>>>> isn't >>>>>>> much better, I was just >>>>>>> listing it for completeness, and that I would prefer to make it a >>>>>>> configuration of the key-changing >>>>>>> operation itself -- that's what I meant by >>>>>>> >>>>>>> a better alternative might be to introduce this ... to the config >>>>>>> object >>>>>> of >>>>>>>> the operator that's actually >>>>>>> >>>>>>> doing the key changing operation >>>>>>> >>>>>>> >>>>>>> I personally believe this is the semantically "correct" way to >>>>>>> approach >>>>>>> this, since "preserves partitioning" >>>>>>> or "does not preserve partitioning" is a property of a key-changing >>>>>>> operation and not an operation on the >>>>>>> stream itself. Also, this way the user need only tell Streams which >>>>>>> operations do or do not preserve the >>>>>>> partitioning, and Streams can figure out where to insert a >>>>>>> repartition in >>>>>>> the topology as it does today. >>>>>>> >>>>>>> Otherwise, we're rendering this particularly useful feature of the >>>>>>> DSL -- >>>>>>> automatic repartitioning -- pretty >>>>>>> much useless, since the user now has to figure out whether a >>>>>>> repartition >>>>>> is >>>>>>> needed. On top of that, they >>>>>>> need to have some understanding of where and when this internal >>>>>>> automatic >>>>>>> repartitioning logic is going >>>>>>> to insert that repartition in order to cancel it in the appropriate >>>>>> place. >>>>>>> Which is pretty unfortunate, since >>>>>>> that logic is not part of the public contract: it can change at any >>>>>>> time, >>>>>>> for example as it did when we introduced >>>>>>> the repartition merging optimization. >>>>>>> >>>>>>> All that said, those are valid concerns regarding the expansion of the >>>>>>> API's surface area. Since none of >>>>>>> the key-changing operations currently have a config object like some >>>>>> other >>>>>>> operations (for example Grouped >>>>>>> or Consumed, etc), this would double the number of overloads. But >>>>>>> maybe >>>>>>> this is a good opportunity to fix >>>>>>> that problem, rather than keep digging ourselves into holes by >>>>>>> trying to >>>>>>> work around it. >>>>>>> >>>>>>> It looks like all of those key-changing operations have two >>>>>>> overloads at >>>>>>> the moment, one with no parameters >>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the >>>>>>> other with an additional Named >>>>>>> parameter, which is itself another kind of configuration. What if we >>>>>>> instead deprecate the existing overloads >>>>>>> that accept a Named, and replace them with overloads that take an >>>>>>> operator-specific config object like we do >>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named >>>>>>> and >>>>>>> this `markAsPartitioned` flag >>>>>>> be part of the general config object, which (a) does not expand the >>>>>>> API >>>>>>> surface area at all in this KIP, and (b) >>>>>>> also protects future KIPs from needing to have this same conversation >>>>>> over >>>>>>> and over, because we can now >>>>>>> stick any additional operator properties into that same config object. >>>>>>> >>>>>>> WDYT? By the way, the above idea (introducing a single config >>>>>>> object to >>>>>>> wrap all operator properties) was also >>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his >>>>>> mind >>>>>>> since then :) >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev >>>>>>> <iponoma...@mail.ru.invalid >>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Matthias, >>>>>>>> >>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it >>>>>>>> describes >>>>>>>> what this operation is actually doing! >>>>>>>> >>>>>>>> Hi Sophie, >>>>>>>> >>>>>>>> I see the concern about poor code cohesion. We declare key mapping in >>>>>>>> one place of code, then later in another place we say >>>>>>>> "markAsPartitioned()". When we change the code six months later, we >>>>>>>> might forget to remove markAsPartitioned(), especially if it's >>>>>>>> placed in >>>>>>>> another method or class. But I don't understand why do you propose to >>>>>>>> include this config into Grouped/Joined/StreamJoined, because from >>>>>>>> this >>>>>>>> point of view it's not a better solution? >>>>>>>> >>>>>>>> The best approach regarding the cohesion might be to to add an extra >>>>>>>> 'preservePartition' flag to every key-changing operation, that is >>>>>>>> >>>>>>>> 1) selectKey >>>>>>>> 2) map >>>>>>>> 3) flatMap >>>>>>>> 4) transform >>>>>>>> 5) flatTransform >>>>>>>> >>>>>>>> in order to tell if the provided mapping require repartition or not. >>>>>>>> Indeed, this is a mapping operation property, not grouping one! >>>>>>>> BTW: the >>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John >>>>>>>> Roesler. >>>>>>>> >>>>>>>> Arguments in favour for this approach: 1) better code cohesion >>>>>>>> from the >>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken >>>>>>>> depending on metadata provided for all the upstream mappings), 3) >>>>>>>> overall safer for the user. >>>>>>>> >>>>>>>> Arguments against: invasive KStreams API change, 5 more method >>>>>>>> overloads. Further on, when we add a new key-changing operation to >>>>>>>> KStream, we must add an overloaded version with 'preservePartition'. >>>>>>>> When we add a new overloaded version for existing operation, we >>>>>>>> actually >>>>>>>> might need to add two or more overloaded versions. This will soon >>>>>>>> become >>>>>>>> a mess. >>>>>>>> >>>>>>>> I thought that since `markAsPartitioned` is intended for advanced >>>>>>>> users, >>>>>>>> they will use it with care. When you're in a position where every >>>>>>>> serialization/deserialization round matters for the latency, you're >>>>>>>> extremely careful with the topology and you will not thoughtlessly >>>>>>>> add >>>>>>>> new key-changing operations without controlling how it's going to >>>>>>>> change >>>>>>>> the overall topology. >>>>>>>> >>>>>>>> By the way, if we later find a better solution, it's way more easy to >>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method >>>>>> overloads. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет: >>>>>>>>> Do we really need a whole DSL operator for this? I think the >>>>>>>>> original >>>>>>>> name >>>>>>>>> for this >>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this >>>>>>>>> is not >>>>>> an >>>>>>>>> operation on the >>>>>>>>> stream itself but rather a command/request to whichever operator >>>>>>>>> would >>>>>>>> have >>>>>>>>> otherwise triggered this repartition. >>>>>>>>> >>>>>>>>> What about instead adding a new field to the >>>>>> Grouped/Joined/StreamJoined >>>>>>>>> config >>>>>>>>> objects that signals them to skip the repartitioning? >>>>>>>>> >>>>>>>>> The one downside to this specific proposal is that you would then >>>>>>>>> need >>>>>> to >>>>>>>>> specify >>>>>>>>> this for every stateful operation downstream of the key-changing >>>>>>>> operation. >>>>>>>>> So a >>>>>>>>> better alternative might be to introduce this `skipRepartition` >>>>>>>>> field, >>>>>> or >>>>>>>>> whatever we >>>>>>>>> want to call it, to the config object of the operator that's >>>>>>>>> actually >>>>>>>> doing >>>>>>>>> the key >>>>>>>>> changing operation which is apparently preserving the partitioning. >>>>>>>>> >>>>>>>>> Imo this would be more "safe" relative to the current proposal, >>>>>>>>> as the >>>>>>>> user >>>>>>>>> has to >>>>>>>>> explicitly consider whether every key changing operation is indeed >>>>>>>>> preserving the >>>>>>>>> partitioning. Otherwise you could code up a topology with several >>>>>>>>> key >>>>>>>>> changing >>>>>>>>> operations at the beginning which do require repartitioning. Then >>>>>>>>> you >>>>>> get >>>>>>>>> to the end >>>>>>>>> of the topology and insert one final key changing operation that >>>>>> doesn't, >>>>>>>>> assume >>>>>>>>> you can just cancel the repartition, and suddenly you're >>>>>>>>> wondering why >>>>>>>> your >>>>>>>>> results >>>>>>>>> are all screwed up >>>>>>>>> >>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for the KIP Ivan! >>>>>>>>>> >>>>>>>>>> I think it's a good feature to give advanced users more control, >>>>>>>>>> and >>>>>>>>>> allow them to build more efficient application. >>>>>>>>>> >>>>>>>>>> Not sure if I like the proposed named though (the good old "naming >>>>>>>>>> things" discussion :)) >>>>>>>>>> >>>>>>>>>> Did you consider alternatives? What about >>>>>>>>>> >>>>>>>>>> - markAsPartitioned() >>>>>>>>>> - markAsKeyed() >>>>>>>>>> - skipRepartition() >>>>>>>>>> >>>>>>>>>> Not sure if there are other idea on a good name? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote: >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> I'd like to start a discussion for KIP-759: >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling >>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct` >>>>>>>>>>> operator, which turned out to be a separate proposal. >>>>>>>>>>> >>>>>>>>>>> The proposal is quite trivial, however, we still might consider >>>>>>>>>>> alternatives (see 'Possible Alternatives' section). >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Ivan >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>