Hi all, Since there was no activity around this KIP, I’ll pick it up in coming weeks and continue the discussion.
Best, Levani > On 27. Apr 2022, at 22:50, Matthias J. Sax <mj...@apache.org> wrote: > > Let's wait a couple of days to give Ivan a chance to reply. If he does not > reply, feel free to pick it up. > > > -Matthias > > On 4/26/22 3:58 AM, Levani Kokhreidze wrote: >> Hi, >> Sorry, maybe I am jumping the gun here, but if by any chance this KIP >> becomes dormant, I'd be interested in picking it up. >> Levani >>> On 23. Apr 2022, at 02:43, Matthias J. Sax <mj...@apache.org> wrote: >>> >>> Ivan, >>> >>> are you still interested in this KIP? I think it would be a good addition. >>> >>> >>> -Matthias >>> >>> On 8/16/21 5:30 PM, Matthias J. Sax wrote: >>>> Your point about the IQ problem is an interesting one. I missed the >>>> point that the "new key" would be a "superkey", and thus, it should >>>> always be possible to compute the original key from the superkey. (As a >>>> matter of fact, for windowed-table the windowed-key is also a superkey...) >>>> I am not sure if we need to follow the "use the head idea" or if we need >>>> a "CompositeKey" interface? It seems we can just allow for any types and >>>> we can be agnostic to it? >>>> KStream<K, V> stream = ... >>>> KStream<SK, V> stream2 = >>>> stream.selectKey(/*set superkey*/) >>>> .markAsPartitioned() >>>> We only need a `Function<SK, K>` without any restrictions on the type, >>>> to map the "superkey" to the original "partition key"? >>>> Do you propose to provide the "revers mapper" via the >>>> `markAsPartitioned()` method (or config object), or via the IQ methods? >>>> Not sure which one is better? >>>> However, I am not sure if it would solve the join problem? At least not >>>> easily: if one has two KStream<Tuple,...> and one is properly >>>> partitioned by `Tuple` while the other one is "marked-as-partitoned", >>>> the join would just fail. -- Similar for a stream-table join. -- The >>>> only fix would be to do the re-partitioning anyway, effectively ignoring >>>> the "user hint", but it seems to defeat the purpose? Again, I would >>>> argue that it is ok to not handle this case, but leave it as the >>>> responsibility for the user to not mess it up. >>>> -Matthias >>>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote: >>>>> Hi Matthias and Sophie! >>>>> >>>>> ==1. markAsPartitioned vs extending `selectKey(..)` etc. with a config.== >>>>> >>>>> I don't have a strong opinion here, both Sophie's and Matthias' points >>>>> look convincing for me. >>>>> >>>>> I think we should estimate the following: what is the probability that >>>>> we will ever need to extend `selectKey` etc. with a config for the >>>>> purposes other than `markAsPartitioned`? >>>>> >>>>> If we find this probability high, then it's just a refactoring to >>>>> deprecate overloads with `Named` and introduce overloads with dedicated >>>>> configs, and we should do it this way. >>>>> >>>>> If it's low or zero, maybe it's better not to mess with the existing >>>>> APIs and to introduce a single `markAsPartitioned()` method, which >>>>> itself can be easily deprecated if we find a better solution later! >>>>> >>>>> >>>>> ==2. The IQ problem== >>>>> >>>>>> it then has to be the case that >>>>> >>>>>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>>>> >>>>> >>>>> Sophie, you got this wrong, and Matthias already explained why. >>>>> >>>>> The actual required property for the mapping function is: >>>>> >>>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2)) >>>>> >>>>> or, by contraposition law, >>>>> >>>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) ) >>>>> >>>>> >>>>> (look at the whiteboard photo that I attached to the KIP). >>>>> >>>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This >>>>> is actually what we often do before aggregation, and this mapping does >>>>> not require repartition. >>>>> >>>>> But of course we can extract the original key from Tuple(key, anyValue), >>>>> and this can save IQ and joins! >>>>> >>>>> This is what I'm talking about when I talk about 'CompositeKey' idea. >>>>> >>>>> We can do the following: >>>>> >>>>> 1. implement a 'partitioner wrapper' that recognizes tuples >>>>> (CompositeKeys) and uses only the 'head' to calculate the partition, >>>>> >>>>> 2. implement >>>>> >>>>> selectCompositeKey(BiFunction<K, V> tailSelector) { >>>>> selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v)); >>>>> //MARK_AS_PARTITIONED call here, >>>>> //but this call is an implementation detail and we do not expose >>>>> //markAsPartitioned publicly! >>>>> } >>>>> >>>>> WDYT? (it's just a brainstorming idea) >>>>> >>>>> 09.08.2021 2:38, Matthias J. Sax пишет: >>>>>> Hi, >>>>>> >>>>>> I originally had a similar thought about `markAsPartitioned()` vs >>>>>> extending `selectKey()` et al. with a config. While I agree that it >>>>>> might be conceptually cleaner to use a config object, I did not propose >>>>>> it as the API impact (deprecating stuff and adding new stuff) is quite >>>>>> big... If we think it's an acceptable price to pay, I am ok with it >>>>>> though. >>>>>> >>>>>> I also do think, that `markAsPartitioned()` could actually be >>>>>> categorized as an operator... We don't expose it in the API as >>>>>> first-class citizen atm, but in fact we have two types of `KStream` -- a >>>>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus, >>>>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the >>>>>> one into the other. >>>>>> >>>>>> I also think that the raised concern about "forgetting to remove >>>>>> `markAsPartitioned()`" might not be very strong though. If you have >>>>>> different places in the code that link stuff together, a call to eg. >>>>>> `selectKey().markAsPartitioned()` must always to together. If you have >>>>>> some other place in the code that get a `KStream` passed an input, it >>>>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't >>>>>> know anything about the upstream code. Of course, it requires some >>>>>> "coding discipline" to follow this pattern... Also, you can shoot >>>>>> themselves into the foot if they want with the config object pattern, >>>>>> too: if you get a `KStream` passed in, you can skip repartitioning via >>>>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still >>>>>> slightly prefer to add `markAsPartitioned()` as an operator. >>>>>> >>>>>> (Maybe we should have expose a `PartitionedKStream` as first class >>>>>> object to begin with... Hard to introduce now I guess...) >>>>>> >>>>>> >>>>>> The concern about IQ is interesting -- I did not realize this impact. >>>>>> Thanks for bringing it up. >>>>>> >>>>>>> a repartition would be a no-op, ie that the stream (and its >>>>>>> partitioning) >>>>>>> would be the same >>>>>>> whether or not a repartition is inserted. For this to be true, it >>>>>>> then has >>>>>>> to be the case that >>>>>>> >>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>>>>> >>>>>> @Sophie: I don't think this statement is correct. A `markAsPartition()` >>>>>> only means, that the existing partitioning ensure that all messages of >>>>>> the same new key are still in the same partition. Ie, it cannot happen >>>>>> that two new keys (that are the same) are in a different partition. >>>>>> >>>>>> However, if you would physically repartitiong on the new key using the >>>>>> same hash-function as for the old key, there is no guarantee that the >>>>>> same partitions would be picked... And that is why IQ breaks downstream. >>>>>> >>>>>> Btw: using `markAsPartitioned()` could also be an issue for joins for >>>>>> the same reason... I want to call out, that the Jira tickets that did >>>>>> raise the concern about unnecessary repartitioning are about downstream >>>>>> aggregations though... >>>>>> >>>>>> Last but not least: we actually have a similar situation for >>>>>> windowed-aggregations: The result of a window aggregation is partitioned >>>>>> by the "plain key": if we write the result into a topic using the same >>>>>> partitioning function, we would write to different partitions... (I >>>>>> guess it was never an issue so far, as we don't have KIP-300 in place >>>>>> yet...) >>>>>> >>>>>> It's also not an issue for IQ, because we know the plain key, and thus >>>>>> can route to the right task. >>>>>> >>>>>> >>>>>> About a solution: I think it might be ok to say we don't need to solve >>>>>> this problem, but it's the users responsibility to take IQ into account. >>>>>> Ie, if they want to use IQ downstream, the need to repartition: for this >>>>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to >>>>>> apply for the join case I mentioned above. -- Given that >>>>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to >>>>>> the user to use correctly (we should of course call it out in the docs!). >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote: >>>>>>> Before I dive in to the question of IQ and the approaches you >>>>>>> proposed, can >>>>>>> you just >>>>>>> elaborate on the problem itself? By definition, the `markAsPartitioned` >>>>>>> flag means that >>>>>>> a repartition would be a no-op, ie that the stream (and its >>>>>>> partitioning) >>>>>>> would be the same >>>>>>> whether or not a repartition is inserted. For this to be true, it >>>>>>> then has >>>>>>> to be the case that >>>>>>> >>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key)) >>>>>>> >>>>>>> The left-hand side of the above is precisely how we determine the >>>>>>> partition >>>>>>> number that >>>>>>> a key belongs to when using IQ. It shouldn't matter whether the user is >>>>>>> querying a key >>>>>>> in a store upstream of the key-changing operation or a mapped key >>>>>>> downstream of it >>>>>>> -- either way we just apply the given Partitioner. >>>>>>> >>>>>>> See StreamsMetadataState#getKeyQueryMetadataForKey >>>>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283> >>>>>>> >>>>>>> for where this happens >>>>>>> >>>>>>> >>>>>>> If we're concerned that users might try to abuse the new >>>>>>> `markAsPartitioned` feature, >>>>>>> or accidentally misuse it, then we could add a runtime check that >>>>>>> applies >>>>>>> the Partitioner >>>>>>> associated with that subtopology to the key being processed and the >>>>>>> mapped >>>>>>> key result >>>>>>> to assert that they do indeed match. Imo this is probably overkill, just >>>>>>> putting it out there. >>>>>>> >>>>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev >>>>>>> <iponoma...@mail.ru.invalid> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Sophie, >>>>>>>> >>>>>>>> thanks for your reply! So your proposal is: >>>>>>>> >>>>>>>> 1). For each key-changing operation, deprecate the existing overloads >>>>>>>> that accept a Named, and replace them with overloads that take an >>>>>>>> operator-specific config object. >>>>>>>> 2). Add `markAsPartitioned` flag to these configs. >>>>>>>> >>>>>>>> IMO, this looks much better than the original proposal, I like it very >>>>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with >>>>>>>> your points. Repartition logic is not a part of the public contract, >>>>>>>> and >>>>>>>> it's much better to give it correct hints instead of telling explicitly >>>>>>>> what it should do. >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> Since we're generating such bright ideas, maybe we should also >>>>>>>> brainstorm the interactive query problem? >>>>>>>> >>>>>>>> The problem is that interactive queries will not work properly when >>>>>>>> `markAsPartitioned` is used. Although original key and mapped key will >>>>>>>> be in the same partition, we will no longer be able to guess this >>>>>>>> partition given the mapped key only. >>>>>>>> >>>>>>>> The possible approaches are: >>>>>>>> >>>>>>>> 1) Give up and don't use interactive queries together with >>>>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better? >>>>>>>> >>>>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will >>>>>>>> allow IQ to restore the original key in order to choose the correct >>>>>>>> partition. We can place this mapping in our new configuration >>>>>>>> object. Of >>>>>>>> course, there is no way for KStreams to verify in compile time/startup >>>>>>>> time that the this function is actually the reverse mapping that >>>>>>>> extract >>>>>>>> the old key from the new one. Users will forget to provide this >>>>>>>> function. Users will provide wrong functions. This all looks too >>>>>>>> fragile. >>>>>>>> >>>>>>>> 3) Maybe there can be a completely different approach. Let's >>>>>>>> introduce a >>>>>>>> new entity -- composite keys, consisting of "head" and "tail". The >>>>>>>> partition for the composite key is calculated based on its 'head' value >>>>>>>> only. If we provide a key mapping in form key -> CompositeKey(key, >>>>>>>> tail), then it's obvious that we do not need a repartition. When an >>>>>>>> interactive query needs to guess the partition for CompositeKey, it >>>>>>>> just >>>>>>>> extracts its head and calculates the correct partition. >>>>>>>> >>>>>>>> We can select CompositeKey before groupByKey() and aggregation >>>>>>>> operations, and this will not involve repartition. And IQ will work. >>>>>>>> >>>>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases >>>>>>>> when we want to choose a different key, but also avoid repartition? >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Ivan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет: >>>>>>>>> Hey Ivan >>>>>>>>> >>>>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc >>>>>>>>> isn't >>>>>>>>> much better, I was just >>>>>>>>> listing it for completeness, and that I would prefer to make it a >>>>>>>>> configuration of the key-changing >>>>>>>>> operation itself -- that's what I meant by >>>>>>>>> >>>>>>>>> a better alternative might be to introduce this ... to the config >>>>>>>>> object >>>>>>>> of >>>>>>>>>> the operator that's actually >>>>>>>>> >>>>>>>>> doing the key changing operation >>>>>>>>> >>>>>>>>> >>>>>>>>> I personally believe this is the semantically "correct" way to >>>>>>>>> approach >>>>>>>>> this, since "preserves partitioning" >>>>>>>>> or "does not preserve partitioning" is a property of a key-changing >>>>>>>>> operation and not an operation on the >>>>>>>>> stream itself. Also, this way the user need only tell Streams which >>>>>>>>> operations do or do not preserve the >>>>>>>>> partitioning, and Streams can figure out where to insert a >>>>>>>>> repartition in >>>>>>>>> the topology as it does today. >>>>>>>>> >>>>>>>>> Otherwise, we're rendering this particularly useful feature of the >>>>>>>>> DSL -- >>>>>>>>> automatic repartitioning -- pretty >>>>>>>>> much useless, since the user now has to figure out whether a >>>>>>>>> repartition >>>>>>>> is >>>>>>>>> needed. On top of that, they >>>>>>>>> need to have some understanding of where and when this internal >>>>>>>>> automatic >>>>>>>>> repartitioning logic is going >>>>>>>>> to insert that repartition in order to cancel it in the appropriate >>>>>>>> place. >>>>>>>>> Which is pretty unfortunate, since >>>>>>>>> that logic is not part of the public contract: it can change at any >>>>>>>>> time, >>>>>>>>> for example as it did when we introduced >>>>>>>>> the repartition merging optimization. >>>>>>>>> >>>>>>>>> All that said, those are valid concerns regarding the expansion of the >>>>>>>>> API's surface area. Since none of >>>>>>>>> the key-changing operations currently have a config object like some >>>>>>>> other >>>>>>>>> operations (for example Grouped >>>>>>>>> or Consumed, etc), this would double the number of overloads. But >>>>>>>>> maybe >>>>>>>>> this is a good opportunity to fix >>>>>>>>> that problem, rather than keep digging ourselves into holes by >>>>>>>>> trying to >>>>>>>>> work around it. >>>>>>>>> >>>>>>>>> It looks like all of those key-changing operations have two >>>>>>>>> overloads at >>>>>>>>> the moment, one with no parameters >>>>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the >>>>>>>>> other with an additional Named >>>>>>>>> parameter, which is itself another kind of configuration. What if we >>>>>>>>> instead deprecate the existing overloads >>>>>>>>> that accept a Named, and replace them with overloads that take an >>>>>>>>> operator-specific config object like we do >>>>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named >>>>>>>>> and >>>>>>>>> this `markAsPartitioned` flag >>>>>>>>> be part of the general config object, which (a) does not expand the >>>>>>>>> API >>>>>>>>> surface area at all in this KIP, and (b) >>>>>>>>> also protects future KIPs from needing to have this same conversation >>>>>>>> over >>>>>>>>> and over, because we can now >>>>>>>>> stick any additional operator properties into that same config object. >>>>>>>>> >>>>>>>>> WDYT? By the way, the above idea (introducing a single config >>>>>>>>> object to >>>>>>>>> wrap all operator properties) was also >>>>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his >>>>>>>> mind >>>>>>>>> since then :) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev >>>>>>>>> <iponoma...@mail.ru.invalid >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Matthias, >>>>>>>>>> >>>>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it >>>>>>>>>> describes >>>>>>>>>> what this operation is actually doing! >>>>>>>>>> >>>>>>>>>> Hi Sophie, >>>>>>>>>> >>>>>>>>>> I see the concern about poor code cohesion. We declare key mapping in >>>>>>>>>> one place of code, then later in another place we say >>>>>>>>>> "markAsPartitioned()". When we change the code six months later, we >>>>>>>>>> might forget to remove markAsPartitioned(), especially if it's >>>>>>>>>> placed in >>>>>>>>>> another method or class. But I don't understand why do you propose to >>>>>>>>>> include this config into Grouped/Joined/StreamJoined, because from >>>>>>>>>> this >>>>>>>>>> point of view it's not a better solution? >>>>>>>>>> >>>>>>>>>> The best approach regarding the cohesion might be to to add an extra >>>>>>>>>> 'preservePartition' flag to every key-changing operation, that is >>>>>>>>>> >>>>>>>>>> 1) selectKey >>>>>>>>>> 2) map >>>>>>>>>> 3) flatMap >>>>>>>>>> 4) transform >>>>>>>>>> 5) flatTransform >>>>>>>>>> >>>>>>>>>> in order to tell if the provided mapping require repartition or not. >>>>>>>>>> Indeed, this is a mapping operation property, not grouping one! >>>>>>>>>> BTW: the >>>>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John >>>>>>>>>> Roesler. >>>>>>>>>> >>>>>>>>>> Arguments in favour for this approach: 1) better code cohesion >>>>>>>>>> from the >>>>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken >>>>>>>>>> depending on metadata provided for all the upstream mappings), 3) >>>>>>>>>> overall safer for the user. >>>>>>>>>> >>>>>>>>>> Arguments against: invasive KStreams API change, 5 more method >>>>>>>>>> overloads. Further on, when we add a new key-changing operation to >>>>>>>>>> KStream, we must add an overloaded version with 'preservePartition'. >>>>>>>>>> When we add a new overloaded version for existing operation, we >>>>>>>>>> actually >>>>>>>>>> might need to add two or more overloaded versions. This will soon >>>>>>>>>> become >>>>>>>>>> a mess. >>>>>>>>>> >>>>>>>>>> I thought that since `markAsPartitioned` is intended for advanced >>>>>>>>>> users, >>>>>>>>>> they will use it with care. When you're in a position where every >>>>>>>>>> serialization/deserialization round matters for the latency, you're >>>>>>>>>> extremely careful with the topology and you will not thoughtlessly >>>>>>>>>> add >>>>>>>>>> new key-changing operations without controlling how it's going to >>>>>>>>>> change >>>>>>>>>> the overall topology. >>>>>>>>>> >>>>>>>>>> By the way, if we later find a better solution, it's way more easy to >>>>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method >>>>>>>> overloads. >>>>>>>>>> >>>>>>>>>> What do you think? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет: >>>>>>>>>>> Do we really need a whole DSL operator for this? I think the >>>>>>>>>>> original >>>>>>>>>> name >>>>>>>>>>> for this >>>>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this >>>>>>>>>>> is not >>>>>>>> an >>>>>>>>>>> operation on the >>>>>>>>>>> stream itself but rather a command/request to whichever operator >>>>>>>>>>> would >>>>>>>>>> have >>>>>>>>>>> otherwise triggered this repartition. >>>>>>>>>>> >>>>>>>>>>> What about instead adding a new field to the >>>>>>>> Grouped/Joined/StreamJoined >>>>>>>>>>> config >>>>>>>>>>> objects that signals them to skip the repartitioning? >>>>>>>>>>> >>>>>>>>>>> The one downside to this specific proposal is that you would then >>>>>>>>>>> need >>>>>>>> to >>>>>>>>>>> specify >>>>>>>>>>> this for every stateful operation downstream of the key-changing >>>>>>>>>> operation. >>>>>>>>>>> So a >>>>>>>>>>> better alternative might be to introduce this `skipRepartition` >>>>>>>>>>> field, >>>>>>>> or >>>>>>>>>>> whatever we >>>>>>>>>>> want to call it, to the config object of the operator that's >>>>>>>>>>> actually >>>>>>>>>> doing >>>>>>>>>>> the key >>>>>>>>>>> changing operation which is apparently preserving the partitioning. >>>>>>>>>>> >>>>>>>>>>> Imo this would be more "safe" relative to the current proposal, >>>>>>>>>>> as the >>>>>>>>>> user >>>>>>>>>>> has to >>>>>>>>>>> explicitly consider whether every key changing operation is indeed >>>>>>>>>>> preserving the >>>>>>>>>>> partitioning. Otherwise you could code up a topology with several >>>>>>>>>>> key >>>>>>>>>>> changing >>>>>>>>>>> operations at the beginning which do require repartitioning. Then >>>>>>>>>>> you >>>>>>>> get >>>>>>>>>>> to the end >>>>>>>>>>> of the topology and insert one final key changing operation that >>>>>>>> doesn't, >>>>>>>>>>> assume >>>>>>>>>>> you can just cancel the repartition, and suddenly you're >>>>>>>>>>> wondering why >>>>>>>>>> your >>>>>>>>>>> results >>>>>>>>>>> are all screwed up >>>>>>>>>>> >>>>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP Ivan! >>>>>>>>>>>> >>>>>>>>>>>> I think it's a good feature to give advanced users more control, >>>>>>>>>>>> and >>>>>>>>>>>> allow them to build more efficient application. >>>>>>>>>>>> >>>>>>>>>>>> Not sure if I like the proposed named though (the good old "naming >>>>>>>>>>>> things" discussion :)) >>>>>>>>>>>> >>>>>>>>>>>> Did you consider alternatives? What about >>>>>>>>>>>> >>>>>>>>>>>> - markAsPartitioned() >>>>>>>>>>>> - markAsKeyed() >>>>>>>>>>>> - skipRepartition() >>>>>>>>>>>> >>>>>>>>>>>> Not sure if there are other idea on a good name? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote: >>>>>>>>>>>>> Hello, >>>>>>>>>>>>> >>>>>>>>>>>>> I'd like to start a discussion for KIP-759: >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling >>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct` >>>>>>>>>>>>> operator, which turned out to be a separate proposal. >>>>>>>>>>>>> >>>>>>>>>>>>> The proposal is quite trivial, however, we still might consider >>>>>>>>>>>>> alternatives (see 'Possible Alternatives' section). >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>