[
https://issues.apache.org/jira/browse/KAFKA-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743293#comment-16743293
]
John Roesler commented on KAFKA-7294:
-------------------------------------
To clarify, we assume a-priori that all inputs are partitioned according to
their keys (i.e., all records for a given key k_i are in the same partition
p_j). If an operator might change a key (for example, a `groupBy` would change
the key of a record, and a `map` might change the key also, etc.), then we need
to "repartition", that is, send the result to an intermediate topic (where it
will be partitioned by its new key) and then read it back out for the next
operation.
Consider if we have two partitions with some k/v records in them : p1=<\{x:a},
\{x:b}>, and p2=<\{y:a}, \{y:c}>, and we do a simple key-changing operation
where we invert the key and the value. Before repartitioning, we'd have
p1=<\{a:x}, \{b:x}> and p2=<\{a:y}, \{b:y}>. Note that the key "a" is now split
over both partitions, so we have lost the property that all records for a given
key are in just one partition.
This is important because Streams treats each partition as an independent
bucket of work for scalability: p1 and p2 can be assigned to different threads
or machines. If you were to do a stateful operation, like counting events per
key of that last result, p1 and p2 would both independently report \{a:1},
instead of the correct answer \{a:2}.
To prevent this, after that "invert the key and value", we'd send the results
to an intermediate topic, where the records are assigned to partitions by
hashing their keys. This has the effect of collecting all the records for "a"
into just one partition again. Sort of like this q1=<\{a:x}, \{a:y}> and
q2=<\{b:x}, \{c:y}>. Now the aggregation worker that gets assigned q1 will be
able to correctly count the records to get \{a:2}.
However, there is an optimization that we employ: only *stateful* operators
need to be partitioned like this. *Stateless* operators, like `mapValues` or
`filter` just operate record-by-record, so we can save on repartitioning by
delaying the repartition step until just before the next *stateful* operator.
For example, after inverting the keys and values above, we might do a
`mapValues` to convert each value into its ascii representation to get
p1=<\{a:120}, \{b:120}> and p2=<\{a:121}, \{b:121}>. The fact that "a" is split
over two partitions doesn't affect the operation. If this were the end of the
program, we get to avoid the repartition altogether.
This is why, instead of repartitioning as soon as we alter the keys, we just
mark the stream "repartition required" and then actually do the repartitioning
later on, before the next stateful operator _if there is one_.
Now, we come to the "merge" operator. It essentially collates two streams, so
it's not a stateful operator. Also, it doesn't change any keys, so by itself it
doesn't create a need to repartition. If both inputs are correctly partitioned,
then the output is also correctly partitioned. If either or both of the input
streams is marked "repartition required", then the output is also "repartition
required". So far, this is what we have today.
Matthias's observation is that if we merge two streams A and B, of sizes |A|
and |B|, where only B needs reparititoning, then if we wind up repartitioning
the merge result later on, we wind up repartitioning |A|+|B| amount of data,
whereas if we detected that A is already correctly partitioned, and repartition
B just before the merge, then we only wind up repartitioning |B| amount of
data, saving on broker storage, network traffic, processing latency, etc.
But the catch is that we still don't want to repartition B unnecessarily. That
is, if there are no stateful operators downstream of the merge, then we
shouldn't repartition B after all.
Implementing this feature requires using our (relatively new) optimization
framework. After the whole topology graph is put together, we need to analyse
it and essentially "push" the repartition operation to the optimal location.
> Optimize repartitioning for merge()
> -----------------------------------
>
> Key: KAFKA-7294
> URL: https://issues.apache.org/jira/browse/KAFKA-7294
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Priority: Major
>
> For a merge() operator we check at compile time, if one of the input KStreams
> requires repartitioning and set the "requiresRepartitioning" flag for the
> output KStream for this case. This implies, that we pipe data from all input
> KStreams through the repartition topic after the merge().
> Using our optimizer, we could push down the repartition operation before the
> merge() to only repartition the KStream(s) that required repartition and thus
> save network IO for all KStreams that don't require repartitioning.
> Note, that this operation is only correct, if all input streams are
> co-partitioned (cf. KAFKA-7293).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)