Guozhang Wang created KAFKA-3081:
------------------------------------
Summary: KTable Aggregation Implementation
Key: KAFKA-3081
URL: https://issues.apache.org/jira/browse/KAFKA-3081
Project: Kafka
Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Fix For: 0.9.1.0
We need to add the implementation of the KTable aggregation operation. We will
translate it into two stages in the underlying topology:
Stage One:
1. No stores attached.
2. When receiving the record <K, Change<V>> from the upstream processor, call
selector.apply on both Change<V>.newValue and Change<V>.oldValue.
3. Forward the resulted two messages to an intermediate topic (no compaction)
with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.
Stage Two:
1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and <T>
ser-de.
2. Upon consuming a record from the intermediate topic:
2.1. First try fetch from the store, if not exist call initialValue().
2.2. Based on "isAdd" determine to call add(..) or remove(..).
2.3. Forward the aggregate value periodically based on the emit duration to the
sink node with the intermediate topic with key <agg-key> and value
Change<agg-value>.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)