[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350131#comment-15350131 ]
Phil Derome commented on KAFKA-3902: ------------------------------------ I am currently having a problem when porting Confluent UserRegionLambdaExample into the Kafka build after converting to JDK 7. Problem is at run time with LongDeserializer in console consumer. I am investigating and would say that the fix is not ready until this issue is addressed. bin/kafka-console-consumer --topic LargeRegions --from-beginning \ > --zookeeper localhost:2181 --property print.key=true --property > value.deserializer=org.apache.kafka.common.serialization.LongDeserializer asia Processed a total of 1 messages [2016-06-26 10:22:22,126] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 > Optimize KTable.filter() to reduce unnecessary traffic > ------------------------------------------------------ > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{<a: 1>, <b: 2>, <a: 3> ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a <key: null> to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value <a: 3> will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original <key: value> to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is <key: null> and new value is <key: not-null>, and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value for the key previously anyways; otherwise > we send the original pair. > b. If old value is <key: not-null> and new value is <key: null>, > indicating to delete this key, and the filter predicate return false for the > old value, then in this case it is safe to optimize and not returning > anything to the downstream operator, since we know that the old value has > already been filtered in a previous message; otherwise we send the original > pair. > c. If both old and new values are not null, and: > 1) predicate return true on both, send the original pair; > 2) predicate return false on both, we can optimize and do not send > anything; > 3) predicate return true on old and false on new, send the key: \{old > -> null\}; > 4) predicate return false on old and true on new, send the key: > \{null -> new\}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)