[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349932#comment-15349932
 ] 

Phil Derome commented on KAFKA-3902:
------------------------------------

one liner fix, but am unfamiliar with github and submitting proper pull 
request. Code is below, I used 10.0.0 as a base.

Files: KTableFilter.java (1 liner), KTableFilterTest.java (4 lines changes or 
so), MockProcessorSupplier.java (new 1 liner method)

KTableFilter.java
   private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {

        @Override
        public void process(K key, Change<V> change) {
            V newValue = computeValue(key, change.newValue);
            V oldValue = sendOldValues ? computeValue(key, change.oldValue) : 
null;

            if (sendOldValues && oldValue == null && newValue == null) return; 
// unnecessary to forward here.
            context().forward(key, new Change<>(newValue, oldValue));
        }

    }

KTableFilterTest.java
   @Test
    public void testSendingOldValue() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();

        String topic1 = "topic1";

        KTableImpl<String, Integer, Integer> table1 =
                (KTableImpl<String, Integer, Integer>) 
builder.table(stringSerde, intSerde, topic1);
        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, 
Integer, Integer>) table1.filter(
                new Predicate<String, Integer>() {
                    @Override
                    public boolean test(String key, Integer value) {
                        return (value % 2) == 0;
                    }
                });

        table2.enableSendingOldValues();

        MockProcessorSupplier<String, Integer> proc1 = new 
MockProcessorSupplier<>();
        MockProcessorSupplier<String, Integer> proc2 = new 
MockProcessorSupplier<>();

        builder.addProcessor("proc1", proc1, table1.name);
        builder.addProcessor("proc2", proc2, table2.name);

        driver = new KStreamTestDriver(builder, stateDir, null, null);

        driver.process(topic1, "A", 1);
        driver.process(topic1, "B", 1);
        driver.process(topic1, "C", 1);

        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
        proc2.checkEmpty(); // we got nothing since all inputs are odd or 
filtered out

        driver.process(topic1, "A", 2);
        driver.process(topic1, "B", 2);

        proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); // we 
are informed of 2 making it in for both A and B

        driver.process(topic1, "A", 3);

        proc1.checkAndClearProcessResult("A:(3<-2)");
        proc2.checkAndClearProcessResult("A:(null<-2)"); // no change for B but 
A is deleted

        driver.process(topic1, "A", null);
        driver.process(topic1, "B", null);

        proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
        proc2.checkAndClearProcessResult("B:(null<-2)");  // B is deleted from 
source Table1
    }

MockProcessorSupplier.java:
    public void checkEmpty() {
        assertEquals("the number of outputs:", 0, processed.size());
    }


> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
>                 Key: KAFKA-3902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3902
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a <key: null> to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value <a: 3> will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
>     a. If old value is <key: null> and new value is <key: not-null>, and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
>     b. If old value is <key: not-null> and new value is <key: null>, 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
>     c. If both old and new values are not null, and:
>         1) predicate return true on both, send the original pair;
>         2) predicate return false on both, we can optimize and do not send 
> anything;
>         3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
>         4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to