The boolean flag can be reset by a child operator which requires the source to be materialized, more details can be found in this design wiki:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65143671 But just to give you a concrete idea, if your topology is defined as: KTable table1 = builder.table("topic1"); table1.filter().mapValues().to("topic2"); Then then enableSendOldValues will not be set to true and the table1 will not be materialized; if your topology is defined as: KTable table1 = builder.table("topic1"); KTable table2 = table1.filter().mapValues().groupBy(..).aggregate(..); Then the enableSendOldValues will be called from the aggregate() operator, then back-forth to its parent, and then all the way back to the source table1 (as in KTableSource). Guozhang On Mon, Jun 27, 2016 at 3:00 PM, Philippe Derome <phder...@gmail.com> wrote: > Then I don't see any simple solution here at least for a novice, especially > since I don't know what can trigger the boolean flag to true. > On 27 Jun 2016 5:38 p.m., "Guozhang Wang" <wangg...@gmail.com> wrote: > > > My concern is that, the overhead of requesting the source KTable to be > > materialized (i.e. creating a state store, and sending the {old -> new} > > pair instead of the new value only) may be over-whelming compared with > its > > potential benefits of reducing the downstream traffic. > > > > Guozhang > > > > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome <phder...@gmail.com> > > wrote: > > > > > Guozhang, > > > > > > would you say it's advisable to initialize KTableFilter.sendOldValues > to > > > true instead of false? That's what I see that can trigger your > described > > > case 3 to potentially desirable effect, but I didn't include it into > pull > > > request. If left to default value of false, I don't know what mechanism > > > should override it to true. > > > > > > Phil > > > > > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > Thanks! You can follow this step-by-step guidance to contribute to > > Kafka > > > > via github. > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <phder...@gmail.com > > > > > > wrote: > > > > > > > > > I have a 1 liner solution for this in KTableFilter.java and about > 5-6 > > > > lines > > > > > changes to existing unit test > KTableFilterTest.testSendingOldValue. I > > > > > included those lines with context in the JIRA. I am struggling a > bit > > > with > > > > > github being new to it and how to do a proper pull request so > > hopefully > > > > > that can be followed up by you? I had the streams test suite pass > > aside > > > > for > > > > > a few cases that pertain specifically to this JIRA as assumptions > > have > > > > now > > > > > changed. > > > > > > > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <wangg...@gmail.com > > > > > > wrote: > > > > > > > > > > > Hi Philippe, > > > > > > > > > > > > Great, since you agree with my reasonings, I have created a JIRA > > > ticket > > > > > for > > > > > > optimizing KTableFilter (feel free to pick it up if you are > > > interested > > > > in > > > > > > contributing): > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3902 > > > > > > > > > > > > About case 3-c-1), what I meant is that since "predicate return > > true > > > on > > > > > > both", > > > > > > the resulted pair would just be the same as the original pair. > > > > > > > > > > > > About KIP-63, itself is a rather big story, but it has one > > > > correspondence > > > > > > to this JIRA: with caching you can dedup some records with the > same > > > > key, > > > > > > for example in the input records to the KTable is: > > > > > > > > > > > > <a: 1>, <a: 2>, <a: 3>, <a: 4>, <a: 5>, <a: 6> ... > > > > > > > > > > > > And the KTable is materialized into a state store with cache on > top > > > of > > > > > it, > > > > > > then the resulted downstream could be: > > > > > > > > > > > > <a: {null -> 1}>, <a: {1 -> 6}> ... > > > > > > > > > > > > Instead of > > > > > > > > > > > > <a: {null -> 1}>, <a: {1 -> 2}>, <a: {2 -> 3}>, ... <a: {5 -> 6}> > > ... > > > > > > > > > > > > So if it is piped to a filter() operator, then even less data > will > > be > > > > > > produced. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome < > > phder...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > Yes, it looks very good. Your detailed explanation appears > > > compelling > > > > > > > enough to reveal that some of the details of the complexity of > a > > > > > streams > > > > > > > system are probably inherent complexity (not that I dared > assume > > it > > > > was > > > > > > > "easy" but I could afford to be conveniently unaware). It took > me > > > 30 > > > > > > > minutes to grasp this latest response. > > > > > > > > > > > > > > There might be a typo in your email for case 3.c.1) as I would > > > think > > > > we > > > > > > > should send the most recent pair as opposed to original, in any > > > event > > > > > it > > > > > > > does not materially impact your presentation. > > > > > > > > > > > > > > Your case 3a) is really what triggered my line of questioning > > and I > > > > > found > > > > > > > the current behaviour vexing as it may lead to some undesirable > > and > > > > > > > necessary filter (see Michael G. Noll's fix in > > > > UserRegionLambdaExample > > > > > at > > > > > > > the very end trying to weed out null) used to output to topic > to > > > > > console. > > > > > > > Without looking at design, it seemed self-evident to me that > the > > > 3a) > > > > > > > behaviour had to be implemented ( from my point of view with > the > > > code > > > > > > > example I was looking at, it simply means never say to delete a > > key > > > > > that > > > > > > > was never created, simply don't "create a deleted" key). > > > > > > > > > > > > > > Likewise cases 3 b,c look very reasonable. > > > > > > > > > > > > > > Just out of curiosity, did you effectively just restate the > > essence > > > > of > > > > > > > KIP-63 in a more approachable language I could understand or is > > > > KIP-63 > > > > > > > really a different beast? > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hello Philippe, > > > > > > > > > > > > > > > > Very good points, let me dump my thoughts about > "KTable.filter" > > > > > > > > specifically and how we can improve on that: > > > > > > > > > > > > > > > > 1. Some context: when a KTable participates in a downstream > > > > operators > > > > > > > (e.g. > > > > > > > > if that operator is an aggregation), then we need to > > materialize > > > > this > > > > > > > > KTable and send both its old value as well as new value as a > > pair > > > > > {old > > > > > > -> > > > > > > > > new} to the downstream operator. In practice it usually needs > > to > > > > send > > > > > > the > > > > > > > > pair. > > > > > > > > > > > > > > > > So let's discuss about them separately, take the following > > > example > > > > > > source > > > > > > > > stream for your KTable > > > > > > > > > > > > > > > > <a: 1>, <b: 2>, <a: 3> ... > > > > > > > > > > > > > > > > When the KTable needs to be materialized, it will transform > the > > > > > source > > > > > > > > messages into the pairs of: > > > > > > > > > > > > > > > > <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}> > > > > > > > > > > > > > > > > 2. If "send old value" is not enabled, then when the filter > > > > predicate > > > > > > > > returns false, we MUST send a <key: null> to the downstream > > > > operator > > > > > to > > > > > > > > indicate that this key is being filtered in the table. > > Otherwise, > > > > for > > > > > > > > example if your filter is "value < 2", then the updated value > > <a: > > > > 3> > > > > > > will > > > > > > > > just be filtered, resulting in incorrect semantics. > > > > > > > > > > > > > > > > If it returns true we should still send the original <key: > > value> > > > > to > > > > > > > > downstream operators. > > > > > > > > > > > > > > > > 3. If "send old value" is enabled, then there are a couple of > > > cases > > > > > we > > > > > > > can > > > > > > > > consider: > > > > > > > > > > > > > > > > a. If old value is <key: null> and new value is <key: > > > > not-null>, > > > > > > and > > > > > > > > the filter predicate return false for the new value, then in > > this > > > > > case > > > > > > it > > > > > > > > is safe to optimize and not returning anything to the > > downstream > > > > > > > operator, > > > > > > > > since in this case we know there is no value for the key > > > previously > > > > > > > > anyways; otherwise we send the original pair. > > > > > > > > > > > > > > > > b. If old value is <key: not-null> and new value is <key: > > > > null>, > > > > > > > > indicating to delete this key, and the filter predicate > return > > > > false > > > > > > for > > > > > > > > the old value, then in this case it is safe to optimize and > not > > > > > > returning > > > > > > > > anything to the downstream operator, since we know that the > old > > > > value > > > > > > has > > > > > > > > already been filtered in a previous message; otherwise we > send > > > the > > > > > > > original > > > > > > > > pair. > > > > > > > > > > > > > > > > c. If both old and new values are not null, and: > > > > > > > > > > > > > > > > > > > > > > > > 1) predicate return true on both, send the original pair; > > > > > > > > > > > > > > > > 2) predicate return false on both, we can optimize and do > not > > > > send > > > > > > > > anything; > > > > > > > > > > > > > > > > 3) predicate return true on old and false on new, send the > > key: > > > > > {old > > > > > > -> > > > > > > > > null}; > > > > > > > > > > > > > > > > 4) predicate return false on old and true on new, send the > > key: > > > > > {null > > > > > > > -> > > > > > > > > new}; > > > > > > > > > > > > > > > > Does this sounds good to you? > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome < > > > > phder...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks a lot for the detailed feedback, its clarity and the > > > > > reference > > > > > > > to > > > > > > > > > KIP-63, which however is for the most part above my head > for > > > now. > > > > > > > > > > > > > > > > > > Having said that, I still hold the view that the behaviour > I > > > > > > presented > > > > > > > is > > > > > > > > > undesirable and hardly defensible and we may have no choice > > but > > > > to > > > > > > > agree > > > > > > > > to > > > > > > > > > disagree and it could be a sterile discussion to keep at it > > and > > > > > > > > addressing > > > > > > > > > KIP-63 and other issues are more important than my brief > > > > > observation. > > > > > > > > > > > > > > > > > > What follows supports my point of view that the filter > method > > > is > > > > > not > > > > > > > > > behaving as expected and I'd still think it's a defect, > > > however I > > > > > am > > > > > > > > > guarded with my observation admitting my status of "total > > > newbie" > > > > > at > > > > > > > > stream > > > > > > > > > processing and Kafka. > > > > > > > > > > > > > > > > > > if we rewrite the code snippet I provided from > > > > > > > > > KTable<String, *String*> regionCounts = userRegions > > > > > > > > > .groupBy((userId, region) -> KeyValue.pair(region, > > > region)) > > > > > > > > > .count("CountsByRegion") > > > > > > > > > .filter((regionName, count) -> false) > > > > > > > > > .mapValues(count -> count.toString()); > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > > > > > > > KTable<String, Long> regionCounts1 = userRegions > > > > > > > > > .groupBy((userId, region) -> KeyValue.pair(region, > > region)) > > > > > > > > > .count("CountsByRegion"); > > > > > > > > > > > > > > > > > > KTable<String, String> regionCounts = regionCounts1 > > > > > > > > > .filter((regionName, count) -> false) > > > > > > > > > .mapValues(count -> count.toString()); > > > > > > > > > > > > > > > > > > > > > > > > > > > It becomes clear that regionCounts1 could build up plenty > of > > > keys > > > > > > with > > > > > > > > > valid Long counts, normal behaviour > > > > > > > > > > > > > > > > > > (I think you call this a node in the topology in KIP-63 > and > > > > > > > > > regionCounts is a successor node). > > > > > > > > > > > > > > > > > > These regionCounts1 keys are then exposed to evaluation of > > > KTable > > > > > > > > > regionCounts as an input. But why should there be any key > > > created > > > > > in > > > > > > > > > KTable regionCounts that has a false filter? In other > words, > > > the > > > > > > > > > "optimization" > > > > > > > > > > > > > > > > > > seems really compelling here: do not create a key before > that > > > key > > > > > > > > > becomes relevant. The key with a null value is valid and > > > relevant > > > > > in > > > > > > > > > regionCounts1 but not regionCounts. By a programming > > > composition > > > > > > > > > argument, the original block > > > > > > > > > > > > > > > > > > of code I presented should be equivalent to the broken down > > one > > > > in > > > > > > two > > > > > > > > > blocks here (and I guess that's saying 1 unified node in > the > > > > > topology > > > > > > > > > should be equivalent to a chain of 2 nodes represented > below > > > if I > > > > > > > > > understand the terminology right). > > > > > > > > > > > > > > > > > > The contents of regionCounts should not change depending on > > the > > > > set > > > > > > of > > > > > > > > > keys present in regionCounts1 if we view this > > > > > > > > > > > > > > > > > > from a functional programming point of view (it's as if we > > are > > > > > > > > > carrying garbage collected objects into regionCounts), > which > > > > seems > > > > > > > > > natural considering the method filter that is pervasive in > > FP. > > > > > > > > > > > > > > > > > > Here regionCounts is totally oblivious that aggregation > took > > > > place > > > > > > > > > previously in regionCounts1 and that's fine (KIP-63 talks > > much > > > > > about > > > > > > > > > aggregation but I don't really care about, I care about the > > 2nd > > > > > node > > > > > > > > > and the behaviour of filter). > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang < > > > > wangg...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello Philippe, > > > > > > > > > > > > > > > > > > > > I think your question is really in two-folds: > > > > > > > > > > > > > > > > > > > > 1. What is the semantic difference between a KTable and a > > > > > KStream, > > > > > > > and > > > > > > > > > more > > > > > > > > > > specifically how should we interpret (key, null) in > KTable? > > > > > > > > > > > > > > > > > > > > You can find some explanations in this documentation: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream > > > > > > > > > > > > > > > > > > > > Note that KTable itself is still a stream behind the > scene, > > > > > > although > > > > > > > it > > > > > > > > > may > > > > > > > > > > be materialized when necessary. And specifically to your > > > > > question, > > > > > > > > (key, > > > > > > > > > > null) can be treated as a tombstone on the specified key, > > and > > > > > when > > > > > > > this > > > > > > > > > > KTable stream is materialized, it will result in a > "delete" > > > on > > > > > > > > > materialized > > > > > > > > > > view. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. As for the "filter" operator, yes it will generate a > > large > > > > > > amount > > > > > > > of > > > > > > > > > > (key, null) records which indicates "delete" in the > > resulted > > > > > > KTable, > > > > > > > > and > > > > > > > > > > hence large traffic to the piped topic. But we are > working > > on > > > > > > KIP-63 > > > > > > > > > which > > > > > > > > > > unifies the caching mechanism in the `KTable.to` operator > > as > > > > well > > > > > > so > > > > > > > > that > > > > > > > > > > de-duping can be done in this operator and hence the > > outgoing > > > > > > traffic > > > > > > > > can > > > > > > > > > > be largely reduced: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome < > > > > > > phder...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I made a modification of latest Confluent's example > > > > > > > > > > > UserRegionLambdaExample. See relevant code at end of > > email. > > > > > > > > > > > > > > > > > > > > > > Am I correct in understanding that KTable semantics > > should > > > be > > > > > > > similar > > > > > > > > > to > > > > > > > > > > a > > > > > > > > > > > store-backed cache of a view as (per wikipedia on > > > > materialized > > > > > > > views) > > > > > > > > > or > > > > > > > > > > > similar to Oracle's materialized views and indexed > views? > > > > More > > > > > > > > > > > specifically, I am looking at when a (key, null value) > > pair > > > > can > > > > > > > make > > > > > > > > it > > > > > > > > > > > into KTable on generating table from a valid KStream > > with a > > > > > false > > > > > > > > > filter. > > > > > > > > > > > > > > > > > > > > > > Here's relevant code modified from example for which I > > > > observed > > > > > > > that > > > > > > > > > all > > > > > > > > > > > keys within userRegions are sent out to topic > > LargeRegions > > > > > with a > > > > > > > > null > > > > > > > > > > > value. I would think that both regionCounts KTable and > > > topic > > > > > > > > > LargeRegions > > > > > > > > > > > should be empty so that the cached view agrees with the > > > > > intended > > > > > > > > query > > > > > > > > > (a > > > > > > > > > > > query with an intentional empty result set as the > filter > > is > > > > > > > > > intentionally > > > > > > > > > > > false as 1 >= 2). > > > > > > > > > > > > > > > > > > > > > > I am not sure I understand implications properly as I > am > > > new > > > > > but > > > > > > it > > > > > > > > > seems > > > > > > > > > > > possible that a highly selective filter from a large > > > > incoming > > > > > > > stream > > > > > > > > > > would > > > > > > > > > > > result in high memory usage for regionCounts and hence > > the > > > > > stream > > > > > > > > > > > application. > > > > > > > > > > > > > > > > > > > > > > KTable<String, *String*> regionCounts = userRegions > > > > > > > > > > > // Count by region > > > > > > > > > > > // We do not need to specify any explicit serdes > > > because > > > > > the > > > > > > > key > > > > > > > > > > > and value types do not change > > > > > > > > > > > .groupBy((userId, region) -> KeyValue.pair(region, > > > > region)) > > > > > > > > > > > .count("CountsByRegion") > > > > > > > > > > > // discard any regions FOR SAKE OF EXAMPLE > > > > > > > > > > > .filter((regionName, count) -> *1 >= 2*) > > > > > > > > > > > .mapValues(count -> count.toString()); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > KStream<String, *String*> regionCountsForConsole = > > > > > > > > > > regionCounts.toStream(); > > > > > > > > > > > > > > > > > > > > > > regionCountsForConsole.to(stringSerde, *stringSerde*, > > > > > > > > "LargeRegions"); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang