[
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350099#comment-15350099
]
ASF GitHub Bot commented on KAFKA-3902:
---------------------------------------
GitHub user phderome opened a pull request:
https://github.com/apache/kafka/pull/1556
KAFKA-3902
The contribution is my original work and that I license the work to the
project under the project's open source license.
Contributors: Guozhang Wang, Phil Derome
@guozhangwang
Added checkEmpty to validate processor does nothing and added a inhibit
check for filter to fix issue.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/phderome/kafka DEROME-3902
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/1556.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1556
----
commit e07388bf389f09894a10e1af5916420962d47e2a
Author: Philippe Derome <[email protected]>
Date: 2016-06-26T03:16:36Z
KAFKA-3902
Added checkEmpty to validate processor does nothing and added a inhibit
check for filter to fix issue.
----
> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be
> optimized to reduce unnecessary data traffic to downstream operators. More
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g.
> if that operator is an aggregation), then we need to materialize this KTable
> and send both its old value as well as new value as a pair {old -> new} to
> the downstream operator. In practice it usually needs to send the pair.
> So let's discuss about them separately, take the following example source
> stream for your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source
> messages into the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns
> false, we MUST send a <key: null> to the downstream operator to indicate that
> this key is being filtered in the table. Otherwise, for example if your
> filter is "value < 2", then the updated value <a: 3> will just be filtered,
> resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can
> consider:
> a. If old value is <key: null> and new value is <key: not-null>, and the
> filter predicate return false for the new value, then in this case it is safe
> to optimize and not returning anything to the downstream operator, since in
> this case we know there is no value for the key previously anyways; otherwise
> we send the original pair.
> b. If old value is <key: not-null> and new value is <key: null>,
> indicating to delete this key, and the filter predicate return false for the
> old value, then in this case it is safe to optimize and not returning
> anything to the downstream operator, since we know that the old value has
> already been filtered in a previous message; otherwise we send the original
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send
> anything;
> 3) predicate return true on old and false on new, send the key: \{old
> -> null\};
> 4) predicate return false on old and true on new, send the key:
> \{null -> new\};
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)