[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350149#comment-15350149 ]
Phil Derome commented on KAFKA-3902: ------------------------------------ The problem above is not quite a real problem because enableSendingOldValues is false by default and so nulls are still sent out as per Guozhang' s description of case 2. However I still don't know how to validate fix with app developer setting enableSendingOldValues to true as I don't see how that's exposed at a higher API level. Cheers, Phil > Optimize KTable.filter() to reduce unnecessary traffic > ------------------------------------------------------ > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{<a: 1>, <b: 2>, <a: 3> ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a <key: null> to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value <a: 3> will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original <key: value> to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is <key: null> and new value is <key: not-null>, and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value for the key previously anyways; otherwise > we send the original pair. > b. If old value is <key: not-null> and new value is <key: null>, > indicating to delete this key, and the filter predicate return false for the > old value, then in this case it is safe to optimize and not returning > anything to the downstream operator, since we know that the old value has > already been filtered in a previous message; otherwise we send the original > pair. > c. If both old and new values are not null, and: > 1) predicate return true on both, send the original pair; > 2) predicate return false on both, we can optimize and do not send > anything; > 3) predicate return true on old and false on new, send the key: \{old > -> null\}; > 4) predicate return false on old and true on new, send the key: > \{null -> new\}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)