[
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350149#comment-15350149
]
Phil Derome commented on KAFKA-3902:
------------------------------------
The problem above is not quite a real problem because enableSendingOldValues is
false by default and so nulls are still sent out as per Guozhang' s description
of case 2.
However I still don't know how to validate fix with app developer setting
enableSendingOldValues to true as I don't see how that's exposed at a higher
API level.
Cheers,
Phil
> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be
> optimized to reduce unnecessary data traffic to downstream operators. More
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g.
> if that operator is an aggregation), then we need to materialize this KTable
> and send both its old value as well as new value as a pair {old -> new} to
> the downstream operator. In practice it usually needs to send the pair.
> So let's discuss about them separately, take the following example source
> stream for your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source
> messages into the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns
> false, we MUST send a <key: null> to the downstream operator to indicate that
> this key is being filtered in the table. Otherwise, for example if your
> filter is "value < 2", then the updated value <a: 3> will just be filtered,
> resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can
> consider:
> a. If old value is <key: null> and new value is <key: not-null>, and the
> filter predicate return false for the new value, then in this case it is safe
> to optimize and not returning anything to the downstream operator, since in
> this case we know there is no value for the key previously anyways; otherwise
> we send the original pair.
> b. If old value is <key: not-null> and new value is <key: null>,
> indicating to delete this key, and the filter predicate return false for the
> old value, then in this case it is safe to optimize and not returning
> anything to the downstream operator, since we know that the old value has
> already been filtered in a previous message; otherwise we send the original
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send
> anything;
> 3) predicate return true on old and false on new, send the key: \{old
> -> null\};
> 4) predicate return false on old and true on new, send the key:
> \{null -> new\};
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)