Hello Philippe,

I think your question is really in two-folds:

1. What is the semantic difference between a KTable and a KStream, and more
specifically how should we interpret (key, null) in KTable?

You can find some explanations in this documentation:
http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream

Note that KTable itself is still a stream behind the scene, although it may
be materialized when necessary. And specifically to your question, (key,
null) can be treated as a tombstone on the specified key, and when this
KTable stream is materialized, it will result in a "delete" on materialized
view.


2. As for the "filter" operator, yes it will generate a large amount of
(key, null) records which indicates "delete" in the resulted KTable, and
hence large traffic to the piped topic. But we are working on KIP-63 which
unifies the caching mechanism in the `KTable.to` operator as well so that
de-duping can be done in this operator and hence the outgoing traffic can
be largely reduced:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams


Guozhang


On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <phder...@gmail.com> wrote:

> I made a modification of latest Confluent's example
> UserRegionLambdaExample. See relevant code at end of email.
>
> Am I correct in understanding that KTable semantics should be similar to a
> store-backed cache of a view as (per wikipedia on materialized views) or
> similar to Oracle's materialized views and indexed views? More
> specifically, I am looking at when a (key, null value) pair can make it
> into KTable on generating table from a valid KStream with a false filter.
>
> Here's relevant code modified from example for which I observed that all
> keys within userRegions are sent out to topic LargeRegions with a null
> value. I would think that both regionCounts KTable and topic LargeRegions
> should be empty so that the cached view agrees with the intended query (a
> query with an intentional empty result set as the filter is intentionally
> false as 1 >= 2).
>
> I am not sure I understand implications properly as I am new but it seems
> possible that  a highly selective filter from a large incoming stream would
> result in high memory usage for regionCounts and hence the stream
> application.
>
> KTable<String, *String*> regionCounts = userRegions
>     // Count by region
>     // We do not need to specify any explicit serdes because the key
> and value types do not change
>     .groupBy((userId, region) -> KeyValue.pair(region, region))
>     .count("CountsByRegion")
>     // discard any regions FOR SAKE OF EXAMPLE
>     .filter((regionName, count) -> *1 >= 2*)
>     .mapValues(count -> count.toString());
>
>
> KStream<String, *String*> regionCountsForConsole = regionCounts.toStream();
>
> regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions");
>



-- 
-- Guozhang

Reply via email to