[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350099#comment-15350099
 ] 

ASF GitHub Bot commented on KAFKA-3902:
---------------------------------------

GitHub user phderome opened a pull request:

    https://github.com/apache/kafka/pull/1556

    KAFKA-3902

    The contribution is my original work and that I license the work to the 
project under the project's open source license.
    
    Contributors: Guozhang Wang, Phil Derome
    @guozhangwang 
    
    Added checkEmpty to validate processor does nothing  and added a inhibit 
check for filter to fix issue.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/phderome/kafka DEROME-3902

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/1556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1556
    
----
commit e07388bf389f09894a10e1af5916420962d47e2a
Author: Philippe Derome <phder...@gmail.com>
Date:   2016-06-26T03:16:36Z

    KAFKA-3902
    Added checkEmpty to validate processor does nothing  and added a inhibit 
check for filter to fix issue.

----


> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
>                 Key: KAFKA-3902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3902
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a <key: null> to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value <a: 3> will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
>     a. If old value is <key: null> and new value is <key: not-null>, and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
>     b. If old value is <key: not-null> and new value is <key: null>, 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
>     c. If both old and new values are not null, and:
>         1) predicate return true on both, send the original pair;
>         2) predicate return false on both, we can optimize and do not send 
> anything;
>         3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
>         4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to