Guozhang Wang created KAFKA-3081:
------------------------------------

             Summary: KTable Aggregation Implementation
                 Key: KAFKA-3081
                 URL: https://issues.apache.org/jira/browse/KAFKA-3081
             Project: Kafka
          Issue Type: Sub-task
            Reporter: Guozhang Wang
            Assignee: Guozhang Wang
             Fix For: 0.9.1.0


We need to add the implementation of the KTable aggregation operation. We will 
translate it into two stages in the underlying topology:

Stage One:
1. No stores attached.

2. When receiving the record <K, Change<V>> from the upstream processor, call 
selector.apply on both Change<V>.newValue and Change<V>.oldValue.

3. Forward the resulted two messages to an intermediate topic (no compaction) 
with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.

Stage Two:

1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and <T> 
ser-de.

2. Upon consuming a record from the intermediate topic:
2.1. First try fetch from the store, if not exist call initialValue().
2.2. Based on "isAdd" determine to call add(..) or remove(..).
2.3. Forward the aggregate value periodically based on the emit duration to the 
sink node with the intermediate topic with key <agg-key> and value 
Change<agg-value>.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to