Guozhang Wang created KAFKA-3101:
------------------------------------
Summary: Optimize Aggregation Outputs
Key: KAFKA-3101
URL: https://issues.apache.org/jira/browse/KAFKA-3101
Project: Kafka
Issue Type: Sub-task
Reporter: Guozhang Wang
Fix For: 0.10.0.0
Today we emit one output record for each incoming message for Table / Windowed
Stream Aggregations. For example, say we have a sequence of aggregate outputs
computed from the input stream (assuming there is no agg value for this key
before):
V1, V2, V3, V4, V5
Then the aggregator will output the following sequence of Change<newValue,
oldValue>:
<V1, null>, <V2, V1>, <V3, V2>, <V4, V3>, <V5, V4>
where could cost a lot of CPU overhead computing the intermediate results.
Instead if we can let the underlying state store to "remember" the last emitted
old value, we can reduce the number of emits based on some configs. More
specifically, we can add one more field in the KV store engine storing the last
emitted old value, which only get updated when we emit to the downstream
processor. For example:
At Beginning:
Store: key => empty (no agg values yet)
V1 computed:
Update Both in Store: key => (V1, V1), Emit <V1, null>
V2 computed:
Update NewValue in Store: key => (V2, V1), No Emit
V3 computed:
Update NewValue in Store: key => (V3, V1), No Emit
V4 computed:
Update Both in Store: key => (V4, V4), Emit <V4, V1>
V5 computed:
Update NewValue in Store: key => (V5, V4), No Emit
One more thing to consider is that, we need a "closing" time control on the
not-yet-emitted keys; when some time has elapsed (or the window is to be
closed), we need to check for any key if their current materialized pairs have
not been emitted (for example <V5, V4> in the above example).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)