[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349932#comment-15349932 ]
Phil Derome commented on KAFKA-3902: ------------------------------------ one liner fix, but am unfamiliar with github and submitting proper pull request. Code is below, I used 10.0.0 as a base. Files: KTableFilter.java (1 liner), KTableFilterTest.java (4 lines changes or so), MockProcessorSupplier.java (new 1 liner method) KTableFilter.java private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> { @Override public void process(K key, Change<V> change) { V newValue = computeValue(key, change.newValue); V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null; if (sendOldValues && oldValue == null && newValue == null) return; // unnecessary to forward here. context().forward(key, new Change<>(newValue, oldValue)); } } KTableFilterTest.java @Test public void testSendingOldValue() throws IOException { KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override public boolean test(String key, Integer value) { return (value % 2) == 0; } }); table2.enableSendingOldValues(); MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); builder.addProcessor("proc1", proc1, table1.name); builder.addProcessor("proc2", proc2, table2.name); driver = new KStreamTestDriver(builder, stateDir, null, null); driver.process(topic1, "A", 1); driver.process(topic1, "B", 1); driver.process(topic1, "C", 1); proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc2.checkEmpty(); // we got nothing since all inputs are odd or filtered out driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); // we are informed of 2 making it in for both A and B driver.process(topic1, "A", 3); proc1.checkAndClearProcessResult("A:(3<-2)"); proc2.checkAndClearProcessResult("A:(null<-2)"); // no change for B but A is deleted driver.process(topic1, "A", null); driver.process(topic1, "B", null); proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); proc2.checkAndClearProcessResult("B:(null<-2)"); // B is deleted from source Table1 } MockProcessorSupplier.java: public void checkEmpty() { assertEquals("the number of outputs:", 0, processed.size()); } > Optimize KTable.filter() to reduce unnecessary traffic > ------------------------------------------------------ > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{<a: 1>, <b: 2>, <a: 3> ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a <key: null> to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value <a: 3> will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original <key: value> to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is <key: null> and new value is <key: not-null>, and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value for the key previously anyways; otherwise > we send the original pair. > b. If old value is <key: not-null> and new value is <key: null>, > indicating to delete this key, and the filter predicate return false for the > old value, then in this case it is safe to optimize and not returning > anything to the downstream operator, since we know that the old value has > already been filtered in a previous message; otherwise we send the original > pair. > c. If both old and new values are not null, and: > 1) predicate return true on both, send the original pair; > 2) predicate return false on both, we can optimize and do not send > anything; > 3) predicate return true on old and false on new, send the key: \{old > -> null\}; > 4) predicate return false on old and true on new, send the key: > \{null -> new\}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)