Guozhang Wang created KAFKA-3101:
------------------------------------

             Summary: Optimize Aggregation Outputs
                 Key: KAFKA-3101
                 URL: https://issues.apache.org/jira/browse/KAFKA-3101
             Project: Kafka
          Issue Type: Sub-task
            Reporter: Guozhang Wang
             Fix For: 0.10.0.0


Today we emit one output record for each incoming message for Table / Windowed 
Stream Aggregations. For example, say we have a sequence of aggregate outputs 
computed from the input stream (assuming there is no agg value for this key 
before):

V1, V2, V3, V4, V5

Then the aggregator will output the following sequence of Change<newValue, 
oldValue>:

<V1, null>, <V2, V1>, <V3, V2>, <V4, V3>, <V5, V4>

where could cost a lot of CPU overhead computing the intermediate results. 

Instead if we can let the underlying state store to "remember" the last emitted 
old value, we can reduce the number of emits based on some configs. More 
specifically, we can add one more field in the KV store engine storing the last 
emitted old value, which only get updated when we emit to the downstream 
processor. For example:

At Beginning:                 
Store: key => empty (no agg values yet)

V1 computed:         
Update Both in Store: key => (V1, V1),     Emit <V1, null>

V2 computed:         
Update NewValue in Store: key => (V2, V1),     No Emit

V3 computed:         
Update NewValue in Store: key => (V3, V1),     No Emit

V4 computed:         
Update Both in Store: key => (V4, V4),     Emit <V4, V1>

V5 computed:         
Update NewValue in Store: key => (V5, V4),     No Emit

One more thing to consider is that, we need a "closing" time control on the 
not-yet-emitted keys; when some time has elapsed (or the window is to be 
closed), we need to check for any key if their current materialized pairs have 
not been emitted (for example <V5, V4> in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to