[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364919#comment-15364919 ]
Bill Bejeck commented on KAFKA-3101: ------------------------------------ Is this available now? If so I'd like to pick this up if possible. > Optimize Aggregation Outputs > ---------------------------- > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of Change<newValue, > oldValue>: > <V1, null>, <V2, V1>, <V3, V2>, <V4, V3>, <V5, V4> > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit <V1, null> > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit <V4, V1> > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example <V5, V4> in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)