[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349872#comment-15349872 ]
Phil Derome commented on KAFKA-3902: ------------------------------------ Can we remove in KTableFilterTest testSendingOldValue references to proc2 as a consequence of this ticket? Specifically following ones (towards end of file): proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); proc2.checkAndClearProcessResult("A:(null<-2)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)"); My tentative fix invalidates these ones. > Optimize KTable.filter() to reduce unnecessary traffic > ------------------------------------------------------ > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{<a: 1>, <b: 2>, <a: 3> ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a <key: null> to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value <a: 3> will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original <key: value> to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is <key: null> and new value is <key: not-null>, and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value for the key previously anyways; otherwise > we send the original pair. > b. If old value is <key: not-null> and new value is <key: null>, > indicating to delete this key, and the filter predicate return false for the > old value, then in this case it is safe to optimize and not returning > anything to the downstream operator, since we know that the old value has > already been filtered in a previous message; otherwise we send the original > pair. > c. If both old and new values are not null, and: > 1) predicate return true on both, send the original pair; > 2) predicate return false on both, we can optimize and do not send > anything; > 3) predicate return true on old and false on new, send the key: \{old > -> null\}; > 4) predicate return false on old and true on new, send the key: > \{null -> new\}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)