[
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349932#comment-15349932
]
Phil Derome commented on KAFKA-3902:
------------------------------------
one liner fix, but am unfamiliar with github and submitting proper pull
request. Code is below, I used 10.0.0 as a base.
Files: KTableFilter.java (1 liner), KTableFilterTest.java (4 lines changes or
so), MockProcessorSupplier.java (new 1 liner method)
KTableFilter.java
private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
@Override
public void process(K key, Change<V> change) {
V newValue = computeValue(key, change.newValue);
V oldValue = sendOldValues ? computeValue(key, change.oldValue) :
null;
if (sendOldValues && oldValue == null && newValue == null) return;
// unnecessary to forward here.
context().forward(key, new Change<>(newValue, oldValue));
}
}
KTableFilterTest.java
@Test
public void testSendingOldValue() throws IOException {
KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
(KTableImpl<String, Integer, Integer>)
builder.table(stringSerde, intSerde, topic1);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String,
Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
});
table2.enableSendingOldValues();
MockProcessorSupplier<String, Integer> proc1 = new
MockProcessorSupplier<>();
MockProcessorSupplier<String, Integer> proc2 = new
MockProcessorSupplier<>();
builder.addProcessor("proc1", proc1, table1.name);
builder.addProcessor("proc2", proc2, table2.name);
driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 1);
driver.process(topic1, "C", 1);
proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)",
"C:(1<-null)");
proc2.checkEmpty(); // we got nothing since all inputs are odd or
filtered out
driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);
proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); // we
are informed of 2 making it in for both A and B
driver.process(topic1, "A", 3);
proc1.checkAndClearProcessResult("A:(3<-2)");
proc2.checkAndClearProcessResult("A:(null<-2)"); // no change for B but
A is deleted
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
proc2.checkAndClearProcessResult("B:(null<-2)"); // B is deleted from
source Table1
}
MockProcessorSupplier.java:
public void checkEmpty() {
assertEquals("the number of outputs:", 0, processed.size());
}
> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be
> optimized to reduce unnecessary data traffic to downstream operators. More
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g.
> if that operator is an aggregation), then we need to materialize this KTable
> and send both its old value as well as new value as a pair {old -> new} to
> the downstream operator. In practice it usually needs to send the pair.
> So let's discuss about them separately, take the following example source
> stream for your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source
> messages into the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns
> false, we MUST send a <key: null> to the downstream operator to indicate that
> this key is being filtered in the table. Otherwise, for example if your
> filter is "value < 2", then the updated value <a: 3> will just be filtered,
> resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can
> consider:
> a. If old value is <key: null> and new value is <key: not-null>, and the
> filter predicate return false for the new value, then in this case it is safe
> to optimize and not returning anything to the downstream operator, since in
> this case we know there is no value for the key previously anyways; otherwise
> we send the original pair.
> b. If old value is <key: not-null> and new value is <key: null>,
> indicating to delete this key, and the filter predicate return false for the
> old value, then in this case it is safe to optimize and not returning
> anything to the downstream operator, since we know that the old value has
> already been filtered in a previous message; otherwise we send the original
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send
> anything;
> 3) predicate return true on old and false on new, send the key: \{old
> -> null\};
> 4) predicate return false on old and true on new, send the key:
> \{null -> new\};
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)