Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream<K, V> stream = ...
KStream<SK, V> stream2 =
  stream.selectKey(/*set superkey*/)
        .markAsPartitioned()

We only need a `Function<SK, K>` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream<Tuple,...> and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
> Hi Matthias and Sophie!
> 
> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
> 
> I don't have a strong opinion here, both Sophie's and Matthias' points
> look convincing for me.
> 
> I think we should estimate the following: what is the probability that
> we will ever need to extend `selectKey` etc. with a config for the
> purposes other than `markAsPartitioned`?
> 
> If we find this probability high, then it's just a refactoring to
> deprecate overloads with `Named` and introduce overloads with dedicated
> configs, and we should do it this way.
> 
> If it's low or zero, maybe it's better not to mess with the existing
> APIs and to introduce a single `markAsPartitioned()` method, which
> itself can be easily deprecated if we find a better solution later!
> 
> 
> ==2. The IQ problem==
> 
>> it then has to be the case that
> 
>> Partitioner.partition(key) == Partitioner.partition(map(key))
> 
> 
> Sophie, you got this wrong, and Matthias already explained why.
> 
> The actual required property for the mapping function is:
> 
> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
> 
> or, by contraposition law,
> 
> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
> 
> 
> (look at the whiteboard photo that I attached to the KIP).
> 
> There is a big class of such mappings: key -> Tuple(key, anyValue). This
> is actually what we often do before aggregation, and this mapping does
> not require repartition.
> 
> But of course we can extract the original key from Tuple(key, anyValue),
> and this can save IQ and joins!
> 
> This is what I'm talking about when I talk about 'CompositeKey' idea.
> 
> We can do the following:
> 
> 1. implement a 'partitioner wrapper' that recognizes tuples
> (CompositeKeys) and uses only the 'head' to calculate the partition,
> 
> 2. implement
> 
> selectCompositeKey(BiFunction<K, V> tailSelector) {
>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>   //MARK_AS_PARTITIONED call here,
>   //but this call is an implementation detail and we do not expose
>   //markAsPartitioned publicly!   
> }
> 
> WDYT? (it's just a brainstorming idea)
> 
> 09.08.2021 2:38, Matthias J. Sax пишет:
>> Hi,
>>
>> I originally had a similar thought about `markAsPartitioned()` vs
>> extending `selectKey()` et al. with a config. While I agree that it
>> might be conceptually cleaner to use a config object, I did not propose
>> it as the API impact (deprecating stuff and adding new stuff) is quite
>> big... If we think it's an acceptable price to pay, I am ok with it
>> though.
>>
>> I also do think, that `markAsPartitioned()` could actually be
>> categorized as an operator... We don't expose it in the API as
>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>> one into the other.
>>
>> I also think that the raised concern about "forgetting to remove
>> `markAsPartitioned()`" might not be very strong though. If you have
>> different places in the code that link stuff together, a call to eg.
>> `selectKey().markAsPartitioned()` must always to together. If you have
>> some other place in the code that get a `KStream` passed an input, it
>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>> know anything about the upstream code. Of course, it requires some
>> "coding discipline" to follow this pattern... Also, you can shoot
>> themselves into the foot if they want with the config object pattern,
>> too: if you get a `KStream` passed in, you can skip repartitioning via
>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
>> slightly prefer to add `markAsPartitioned()` as an operator.
>>
>> (Maybe we should have expose a `PartitionedKStream` as first class
>> object to begin with... Hard to introduce now I guess...)
>>
>>
>> The concern about IQ is interesting -- I did not realize this impact.
>> Thanks for bringing it up.
>>
>>> a repartition would be a no-op, ie that the stream (and its
>>> partitioning)
>>> would be the same
>>> whether or not a repartition is inserted. For this to be true, it
>>> then has
>>> to be the case that
>>>
>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>
>> @Sophie: I don't think this statement is correct. A `markAsPartition()`
>> only means, that the existing partitioning ensure that all messages of
>> the same new key are still in the same partition. Ie, it cannot happen
>> that two new keys (that are the same) are in a different partition.
>>
>> However, if you would physically repartitiong on the new key using the
>> same hash-function as for the old key, there is no guarantee that the
>> same partitions would be picked... And that is why IQ breaks downstream.
>>
>> Btw: using `markAsPartitioned()` could also be an issue for joins for
>> the same reason... I want to call out, that the Jira tickets that did
>> raise the concern about unnecessary repartitioning are about downstream
>> aggregations though...
>>
>> Last but not least: we actually have a similar situation for
>> windowed-aggregations: The result of a window aggregation is partitioned
>> by the "plain key": if we write the result into a topic using the same
>> partitioning function, we would write to different partitions... (I
>> guess it was never an issue so far, as we don't have KIP-300 in place
>> yet...)
>>
>> It's also not an issue for IQ, because we know the plain key, and thus
>> can route to the right task.
>>
>>
>> About a solution: I think it might be ok to say we don't need to solve
>> this problem, but it's the users responsibility to take IQ into account.
>> Ie, if they want to use IQ downstream, the need to repartition: for this
>> case, repartitioning is _NOT_ unnecessary... The same argument seems to
>> apply for the join case I mentioned above. -- Given that
>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to
>> the user to use correctly (we should of course call it out in the docs!).
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
>>> Before I dive in to the question of IQ and the approaches you
>>> proposed, can
>>> you just
>>> elaborate on the problem itself? By definition, the `markAsPartitioned`
>>> flag means that
>>> a repartition would be a no-op, ie that the stream (and its
>>> partitioning)
>>> would be the same
>>> whether or not a repartition is inserted. For this to be true, it
>>> then has
>>> to be the case that
>>>
>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>
>>> The left-hand side of the above is precisely how we determine the
>>> partition
>>> number that
>>> a key belongs to when using IQ. It shouldn't matter whether the user is
>>> querying a key
>>> in a store upstream of the key-changing operation or a mapped key
>>> downstream of it
>>> -- either way we just apply the given Partitioner.
>>>
>>> See StreamsMetadataState#getKeyQueryMetadataForKey
>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
>>>
>>> for where this happens
>>>
>>>
>>> If we're concerned that users might try to abuse the new
>>> `markAsPartitioned` feature,
>>> or accidentally misuse it, then we could add a runtime check that
>>> applies
>>> the Partitioner
>>> associated with that subtopology to the key being processed and the
>>> mapped
>>> key result
>>> to assert that they do indeed match. Imo this is probably overkill, just
>>> putting it out there.
>>>
>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
>>> <iponoma...@mail.ru.invalid>
>>> wrote:
>>>
>>>> Hi Sophie,
>>>>
>>>> thanks for your reply! So your proposal is:
>>>>
>>>> 1). For each key-changing operation, deprecate the existing overloads
>>>> that accept a Named, and replace them with overloads that take an
>>>> operator-specific config object.
>>>> 2). Add `markAsPartitioned` flag to these configs.
>>>>
>>>> IMO, this looks much better than the original proposal, I like it very
>>>> much and I think I will rewrite the KIP soon. I absolutely agree with
>>>> your points. Repartition logic is not a part of the public contract,
>>>> and
>>>> it's much better to give it correct hints instead of telling explicitly
>>>> what it should do.
>>>>
>>>> ...
>>>>
>>>> Since we're generating such bright ideas, maybe we should also
>>>> brainstorm the interactive query problem?
>>>>
>>>> The problem is that interactive queries will not work properly when
>>>> `markAsPartitioned` is used. Although original key and mapped key will
>>>> be in the same partition, we will no longer be able to guess this
>>>> partition given the mapped key only.
>>>>
>>>> The possible approaches are:
>>>>
>>>> 1) Give up and don't use interactive queries together with
>>>> `markAsPartitioned`. This is what I suppose now. But can we do better?
>>>>
>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will
>>>> allow IQ to restore the original key in order to choose the correct
>>>> partition. We can place this mapping in our new configuration
>>>> object. Of
>>>> course, there is no way for KStreams to verify in compile time/startup
>>>> time that the this function is actually the reverse mapping that
>>>> extract
>>>> the old key from the new one. Users will forget to provide this
>>>> function. Users will provide wrong functions. This all looks too
>>>> fragile.
>>>>
>>>> 3) Maybe there can be a completely different approach. Let's
>>>> introduce a
>>>> new entity -- composite keys, consisting of "head" and "tail". The
>>>> partition for the composite key is calculated based on its 'head' value
>>>> only. If we provide a key mapping in form key -> CompositeKey(key,
>>>> tail), then it's obvious that we do not need a repartition. When an
>>>> interactive query needs to guess the partition for CompositeKey, it
>>>> just
>>>> extracts its head and calculates the correct partition.
>>>>
>>>> We can select CompositeKey before groupByKey() and aggregation
>>>> operations, and this will not involve repartition. And IQ will work.
>>>>
>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases
>>>> when we want to choose a different key, but also avoid repartition?
>>>>
>>>> Regards,
>>>>
>>>> Ivan
>>>>
>>>>
>>>>
>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
>>>>> Hey Ivan
>>>>>
>>>>> I completely agree that adding it as a config to Grouped/Joined/etc
>>>>> isn't
>>>>> much better, I was just
>>>>> listing it for completeness, and that I would prefer to make it a
>>>>> configuration of the key-changing
>>>>> operation itself -- that's what I meant by
>>>>>
>>>>> a better alternative might be to introduce this ... to the config
>>>>> object
>>>> of
>>>>>> the operator that's actually
>>>>>
>>>>> doing the key changing operation
>>>>>
>>>>>
>>>>> I personally believe this is the semantically "correct" way to
>>>>> approach
>>>>> this, since "preserves partitioning"
>>>>> or "does not preserve partitioning" is a property of a key-changing
>>>>> operation and not an operation on the
>>>>> stream itself. Also, this way the user need only tell Streams which
>>>>> operations do or do not preserve the
>>>>> partitioning, and Streams can figure out where to insert a
>>>>> repartition in
>>>>> the topology as it does today.
>>>>>
>>>>> Otherwise, we're rendering this particularly useful feature of the
>>>>> DSL --
>>>>> automatic repartitioning -- pretty
>>>>> much useless, since the user now has to figure out whether a
>>>>> repartition
>>>> is
>>>>> needed. On top of that, they
>>>>> need to have some understanding of where and when this internal
>>>>> automatic
>>>>> repartitioning logic is going
>>>>> to insert that repartition in order to cancel it in the appropriate
>>>> place.
>>>>> Which is pretty unfortunate, since
>>>>> that logic is not part of the public contract: it can change at any
>>>>> time,
>>>>> for example as it did when we introduced
>>>>> the repartition merging optimization.
>>>>>
>>>>> All that said, those are valid concerns regarding the expansion of the
>>>>> API's surface area. Since none of
>>>>> the key-changing operations currently have a config object like some
>>>> other
>>>>> operations (for example Grouped
>>>>> or Consumed, etc), this would double the number of overloads. But
>>>>> maybe
>>>>> this is a good opportunity to fix
>>>>> that problem, rather than keep digging ourselves into holes by
>>>>> trying to
>>>>> work around it.
>>>>>
>>>>> It looks like all of those key-changing operations have two
>>>>> overloads at
>>>>> the moment, one with no parameters
>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the
>>>>> other with an additional Named
>>>>> parameter, which is itself another kind of configuration. What if we
>>>>> instead deprecate the existing overloads
>>>>> that accept a Named, and replace them with overloads that take an
>>>>> operator-specific config object like we do
>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named
>>>>> and
>>>>> this  `markAsPartitioned` flag
>>>>> be part of the general config object, which (a) does not expand the
>>>>> API
>>>>> surface area at all in this KIP, and (b)
>>>>> also protects future KIPs from needing to have this same conversation
>>>> over
>>>>> and over, because we can now
>>>>> stick any additional operator properties into that same config object.
>>>>>
>>>>> WDYT? By the way, the above idea (introducing a single config
>>>>> object to
>>>>> wrap all operator properties) was also
>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his
>>>> mind
>>>>> since then :)
>>>>>
>>>>>
>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
>>>>> <iponoma...@mail.ru.invalid
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>> Concerning the naming: I like `markAsPartitioned`, because it
>>>>>> describes
>>>>>> what this operation is actually doing!
>>>>>>
>>>>>> Hi Sophie,
>>>>>>
>>>>>> I see the concern about poor code cohesion. We declare key mapping in
>>>>>> one place of code, then later in another place we say
>>>>>> "markAsPartitioned()". When we change the code six months later, we
>>>>>> might forget to remove markAsPartitioned(), especially if it's
>>>>>> placed in
>>>>>> another method or class. But I don't understand why do you propose to
>>>>>> include this config into Grouped/Joined/StreamJoined, because from
>>>>>> this
>>>>>> point of view it's not a better solution?
>>>>>>
>>>>>> The best approach regarding the cohesion might be to to add an extra
>>>>>> 'preservePartition' flag to every key-changing operation, that is
>>>>>>
>>>>>> 1) selectKey
>>>>>> 2) map
>>>>>> 3) flatMap
>>>>>> 4) transform
>>>>>> 5) flatTransform
>>>>>>
>>>>>> in order to tell if the provided mapping require repartition or not.
>>>>>> Indeed, this is a mapping operation property, not grouping one!
>>>>>> BTW: the
>>>>>> idea of adding extra parameter to `selectKey` was once coined by John
>>>>>> Roesler.
>>>>>>
>>>>>> Arguments in favour for this approach: 1) better code cohesion
>>>>>> from the
>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken
>>>>>> depending on metadata provided for all the upstream mappings), 3)
>>>>>> overall safer for the user.
>>>>>>
>>>>>> Arguments against: invasive KStreams API change, 5 more method
>>>>>> overloads. Further on, when we add a new key-changing operation to
>>>>>> KStream, we must add an overloaded version with 'preservePartition'.
>>>>>> When we add a new overloaded version for existing operation, we
>>>>>> actually
>>>>>> might need to add two or more overloaded versions. This will soon
>>>>>> become
>>>>>> a mess.
>>>>>>
>>>>>> I thought that since `markAsPartitioned` is intended for advanced
>>>>>> users,
>>>>>> they will use it with care. When you're in a position where every
>>>>>> serialization/deserialization round matters for the latency, you're
>>>>>> extremely careful with the topology and you will not thoughtlessly
>>>>>> add
>>>>>> new key-changing operations without controlling how it's going to
>>>>>> change
>>>>>> the overall topology.
>>>>>>
>>>>>> By the way, if we later find a better solution, it's way more easy to
>>>>>> deprecate a single `markAsPartitioned` operation than 5 method
>>>> overloads.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
>>>>>>> Do we really need a whole DSL operator for this? I think the
>>>>>>> original
>>>>>> name
>>>>>>> for this
>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this
>>>>>>> is not
>>>> an
>>>>>>> operation on the
>>>>>>> stream itself but rather a command/request to whichever operator
>>>>>>> would
>>>>>> have
>>>>>>> otherwise triggered this repartition.
>>>>>>>
>>>>>>> What about instead adding a new field to the
>>>> Grouped/Joined/StreamJoined
>>>>>>> config
>>>>>>> objects that signals them to skip the repartitioning?
>>>>>>>
>>>>>>> The one downside to this specific proposal is that you would then
>>>>>>> need
>>>> to
>>>>>>> specify
>>>>>>> this for every stateful operation downstream of the key-changing
>>>>>> operation.
>>>>>>> So a
>>>>>>> better alternative might be to introduce this `skipRepartition`
>>>>>>> field,
>>>> or
>>>>>>> whatever we
>>>>>>> want to call it, to the config object of the operator that's
>>>>>>> actually
>>>>>> doing
>>>>>>> the key
>>>>>>> changing operation which is apparently preserving the partitioning.
>>>>>>>
>>>>>>> Imo this would be more "safe" relative to the current proposal,
>>>>>>> as the
>>>>>> user
>>>>>>> has to
>>>>>>> explicitly consider whether every key changing operation is indeed
>>>>>>> preserving the
>>>>>>> partitioning. Otherwise you could code up a topology with several
>>>>>>> key
>>>>>>> changing
>>>>>>> operations at the beginning which do require repartitioning. Then
>>>>>>> you
>>>> get
>>>>>>> to the end
>>>>>>> of the topology and insert one final key changing operation that
>>>> doesn't,
>>>>>>> assume
>>>>>>> you can just cancel the repartition, and suddenly you're
>>>>>>> wondering why
>>>>>> your
>>>>>>> results
>>>>>>> are all screwed up
>>>>>>>
>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the KIP Ivan!
>>>>>>>>
>>>>>>>> I think it's a good feature to give advanced users more control,
>>>>>>>> and
>>>>>>>> allow them to build more efficient application.
>>>>>>>>
>>>>>>>> Not sure if I like the proposed named though (the good old "naming
>>>>>>>> things" discussion :))
>>>>>>>>
>>>>>>>> Did you consider alternatives? What about
>>>>>>>>
>>>>>>>>     - markAsPartitioned()
>>>>>>>>     - markAsKeyed()
>>>>>>>>     - skipRepartition()
>>>>>>>>
>>>>>>>> Not sure if there are other idea on a good name?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'd like to start a discussion for KIP-759:
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
>>>>>>>>> operator, which turned out to be a separate proposal.
>>>>>>>>>
>>>>>>>>> The proposal is quite trivial, however, we still might consider
>>>>>>>>> alternatives (see 'Possible Alternatives' section).
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Ivan
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
> 

Reply via email to