[ https://issues.apache.org/jira/browse/KAFKA-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15095499#comment-15095499 ]
ASF GitHub Bot commented on KAFKA-3081: --------------------------------------- GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/761 KAFKA-3081: Non-windowed Table Aggregation You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka K3081 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/761.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #761 ---- commit 44a78b023a437690c4ef54b26ce3183bef002876 Author: Guozhang Wang <wangg...@gmail.com> Date: 2016-01-11T01:14:27Z add more serdes commit 1a3f1fe2b15f5d4e291f694073d7474461b9f35d Author: Guozhang Wang <wangg...@gmail.com> Date: 2016-01-12T22:00:36Z Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/kafka into K3081 commit 84160d0e2874c9bfba10a8dc59a3e44aac423147 Author: Guozhang Wang <wangg...@gmail.com> Date: 2016-01-13T01:07:34Z add table aggregation commit e08a67867e0de330731914bacd1abc23aa0f3f20 Author: Guozhang Wang <wangg...@gmail.com> Date: 2016-01-13T02:34:10Z add unit test commit c56b83b61581fa88bb7afbd893baf4adc6542cfe Author: Guozhang Wang <wangg...@gmail.com> Date: 2016-01-13T02:37:54Z checkstyle fixes ---- > KTable Aggregation Implementation > --------------------------------- > > Key: KAFKA-3081 > URL: https://issues.apache.org/jira/browse/KAFKA-3081 > Project: Kafka > Issue Type: Sub-task > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Fix For: 0.9.1.0 > > > We need to add the implementation of the KTable aggregation operation. We > will translate it into two stages in the underlying topology: > Stage One: > 1. No stores attached. > 2. When receiving the record <K, Change<V>> from the upstream processor, call > selector.apply on both Change<V>.newValue and Change<V>.oldValue. > 3. Forward the resulted two messages to an intermediate topic (no compaction) > with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean. > Stage Two: > 1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and > <T> ser-de. > 2. Upon consuming a record from the intermediate topic: > 2.1. First try fetch from the store, if not exist call initialValue(). > 2.2. Based on "isAdd" determine to call add(..) or remove(..). > 2.3. Forward the aggregate value periodically based on the emit duration to > the sink node with the intermediate topic with key <agg-key> and value > Change<agg-value>. -- This message was sent by Atlassian JIRA (v6.3.4#6332)