[ 
https://issues.apache.org/jira/browse/KAFKA-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15095499#comment-15095499
 ] 

ASF GitHub Bot commented on KAFKA-3081:
---------------------------------------

GitHub user guozhangwang opened a pull request:

    https://github.com/apache/kafka/pull/761

    KAFKA-3081: Non-windowed Table Aggregation

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guozhangwang/kafka K3081

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/761.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #761
    
----
commit 44a78b023a437690c4ef54b26ce3183bef002876
Author: Guozhang Wang <wangg...@gmail.com>
Date:   2016-01-11T01:14:27Z

    add more serdes

commit 1a3f1fe2b15f5d4e291f694073d7474461b9f35d
Author: Guozhang Wang <wangg...@gmail.com>
Date:   2016-01-12T22:00:36Z

    Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/kafka into 
K3081

commit 84160d0e2874c9bfba10a8dc59a3e44aac423147
Author: Guozhang Wang <wangg...@gmail.com>
Date:   2016-01-13T01:07:34Z

    add table aggregation

commit e08a67867e0de330731914bacd1abc23aa0f3f20
Author: Guozhang Wang <wangg...@gmail.com>
Date:   2016-01-13T02:34:10Z

    add unit test

commit c56b83b61581fa88bb7afbd893baf4adc6542cfe
Author: Guozhang Wang <wangg...@gmail.com>
Date:   2016-01-13T02:37:54Z

    checkstyle fixes

----


> KTable Aggregation Implementation
> ---------------------------------
>
>                 Key: KAFKA-3081
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3081
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>             Fix For: 0.9.1.0
>
>
> We need to add the implementation of the KTable aggregation operation. We 
> will translate it into two stages in the underlying topology:
> Stage One:
> 1. No stores attached.
> 2. When receiving the record <K, Change<V>> from the upstream processor, call 
> selector.apply on both Change<V>.newValue and Change<V>.oldValue.
> 3. Forward the resulted two messages to an intermediate topic (no compaction) 
> with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.
> Stage Two:
> 1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and 
> <T> ser-de.
> 2. Upon consuming a record from the intermediate topic:
> 2.1. First try fetch from the store, if not exist call initialValue().
> 2.2. Based on "isAdd" determine to call add(..) or remove(..).
> 2.3. Forward the aggregate value periodically based on the emit duration to 
> the sink node with the intermediate topic with key <agg-key> and value 
> Change<agg-value>.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to