[ 
https://issues.apache.org/jira/browse/KAFKA-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743293#comment-16743293
 ] 

John Roesler commented on KAFKA-7294:
-------------------------------------

To clarify, we assume a-priori that all inputs are partitioned according to 
their keys (i.e., all records for a given key k_i are in the same partition 
p_j). If an operator might change a key (for example, a `groupBy` would change 
the key of a record, and a `map` might change the key also, etc.), then we need 
to "repartition", that is, send the result to an intermediate topic (where it 
will be partitioned by its new key) and then read it back out for the next 
operation.

Consider if we have two partitions with some k/v records in them : p1=<\{x:a}, 
\{x:b}>, and p2=<\{y:a}, \{y:c}>, and we do a simple key-changing operation 
where we invert the key and the value. Before repartitioning, we'd have 
p1=<\{a:x}, \{b:x}> and p2=<\{a:y}, \{b:y}>. Note that the key "a" is now split 
over both partitions, so we have lost the property that all records for a given 
key are in just one partition.

This is important because Streams treats each partition as an independent 
bucket of work for scalability: p1 and p2 can be assigned to different threads 
or machines. If you were to do a stateful operation, like counting events per 
key of that last result, p1 and p2 would both independently report \{a:1}, 
instead of the correct answer \{a:2}.

To prevent this, after that "invert the key and value", we'd send the results 
to an intermediate topic, where the records are assigned to partitions by 
hashing their keys. This has the effect of collecting all the records for "a" 
into just one partition again. Sort of like this q1=<\{a:x}, \{a:y}> and 
q2=<\{b:x}, \{c:y}>. Now the aggregation worker that gets assigned q1 will be 
able to correctly count the records to get \{a:2}.

However, there is an optimization that we employ: only *stateful* operators 
need to be partitioned like this. *Stateless* operators, like `mapValues` or 
`filter` just operate record-by-record, so we can save on repartitioning by 
delaying the repartition step until just before the next *stateful* operator. 
For example, after inverting the keys and values above, we might do a 
`mapValues` to convert each value into its ascii representation to get 
p1=<\{a:120}, \{b:120}> and p2=<\{a:121}, \{b:121}>. The fact that "a" is split 
over two partitions doesn't affect the operation. If this were the end of the 
program, we get to avoid the repartition altogether.

This is why, instead of repartitioning as soon as we alter the keys, we just 
mark the stream "repartition required" and then actually do the repartitioning 
later on, before the next stateful operator _if there is one_.

Now, we come to the "merge" operator. It essentially collates two streams, so 
it's not a stateful operator. Also, it doesn't change any keys, so by itself it 
doesn't create a need to repartition. If both inputs are correctly partitioned, 
then the output is also correctly partitioned. If either or both of the input 
streams is marked "repartition required", then the output is also "repartition 
required". So far, this is what we have today.

 

Matthias's observation is that if we merge two streams A and B, of sizes |A| 
and |B|, where only B needs reparititoning, then if we wind up repartitioning 
the merge result later on, we wind up repartitioning |A|+|B| amount of data, 
whereas if we detected that A is already correctly partitioned, and repartition 
B just before the merge, then we only wind up repartitioning |B| amount of 
data, saving on broker storage, network traffic, processing latency, etc.

But the catch is that we still don't want to repartition B unnecessarily. That 
is, if there are no stateful operators downstream of the merge, then we 
shouldn't repartition B after all.

Implementing this feature requires using our (relatively new) optimization 
framework. After the whole topology graph is put together, we need to analyse 
it and essentially "push" the repartition operation to the optimal location.

> Optimize repartitioning for merge()
> -----------------------------------
>
>                 Key: KAFKA-7294
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7294
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>
> For a merge() operator we check at compile time, if one of the input KStreams 
> requires repartitioning and set the "requiresRepartitioning" flag for the 
> output  KStream for this case. This implies, that we pipe data from all input 
> KStreams through the repartition topic after the merge().
> Using our optimizer, we could push down the repartition operation before the 
> merge() to only repartition the KStream(s) that required repartition and thus 
> save network IO for all KStreams that don't require repartitioning.
> Note, that this operation is only correct, if all input streams are 
> co-partitioned (cf. KAFKA-7293).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to