[ 
https://issues.apache.org/jira/browse/KAFKA-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3081.
----------------------------------
    Resolution: Fixed

Issue resolved by pull request 761
[https://github.com/apache/kafka/pull/761]

> KTable Aggregation Implementation
> ---------------------------------
>
>                 Key: KAFKA-3081
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3081
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>             Fix For: 0.9.1.0
>
>
> We need to add the implementation of the KTable aggregation operation. We 
> will translate it into two stages in the underlying topology:
> Stage One:
> 1. No stores attached.
> 2. When receiving the record <K, Change<V>> from the upstream processor, call 
> selector.apply on both Change<V>.newValue and Change<V>.oldValue.
> 3. Forward the resulted two messages to an intermediate topic (no compaction) 
> with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.
> Stage Two:
> 1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and 
> <T> ser-de.
> 2. Upon consuming a record from the intermediate topic:
> 2.1. First try fetch from the store, if not exist call initialValue().
> 2.2. Based on "isAdd" determine to call add(..) or remove(..).
> 2.3. Forward the aggregate value periodically based on the emit duration to 
> the sink node with the intermediate topic with key <agg-key> and value 
> Change<agg-value>.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to