Hello Philippe, I think your question is really in two-folds:
1. What is the semantic difference between a KTable and a KStream, and more specifically how should we interpret (key, null) in KTable? You can find some explanations in this documentation: http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream Note that KTable itself is still a stream behind the scene, although it may be materialized when necessary. And specifically to your question, (key, null) can be treated as a tombstone on the specified key, and when this KTable stream is materialized, it will result in a "delete" on materialized view. 2. As for the "filter" operator, yes it will generate a large amount of (key, null) records which indicates "delete" in the resulted KTable, and hence large traffic to the piped topic. But we are working on KIP-63 which unifies the caching mechanism in the `KTable.to` operator as well so that de-duping can be done in this operator and hence the outgoing traffic can be largely reduced: https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams Guozhang On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <phder...@gmail.com> wrote: > I made a modification of latest Confluent's example > UserRegionLambdaExample. See relevant code at end of email. > > Am I correct in understanding that KTable semantics should be similar to a > store-backed cache of a view as (per wikipedia on materialized views) or > similar to Oracle's materialized views and indexed views? More > specifically, I am looking at when a (key, null value) pair can make it > into KTable on generating table from a valid KStream with a false filter. > > Here's relevant code modified from example for which I observed that all > keys within userRegions are sent out to topic LargeRegions with a null > value. I would think that both regionCounts KTable and topic LargeRegions > should be empty so that the cached view agrees with the intended query (a > query with an intentional empty result set as the filter is intentionally > false as 1 >= 2). > > I am not sure I understand implications properly as I am new but it seems > possible that a highly selective filter from a large incoming stream would > result in high memory usage for regionCounts and hence the stream > application. > > KTable<String, *String*> regionCounts = userRegions > // Count by region > // We do not need to specify any explicit serdes because the key > and value types do not change > .groupBy((userId, region) -> KeyValue.pair(region, region)) > .count("CountsByRegion") > // discard any regions FOR SAKE OF EXAMPLE > .filter((regionName, count) -> *1 >= 2*) > .mapValues(count -> count.toString()); > > > KStream<String, *String*> regionCountsForConsole = regionCounts.toStream(); > > regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions"); > -- -- Guozhang