Guozhang, my latest commit would propose that semantics of your JIRA case 2 be changed a little to suppress nulls when not sendingOldValues and not materializing. When a table T2 is created first from another table T1 and the filter does not match for the key k from T1, the invalid key k does not enter T2 at all (no null). Ultimately, the code change is simpler and the test results look more intuitive.
On Wed, Jun 29, 2016 at 6:55 PM, Philippe Derome <phder...@gmail.com> wrote: > good. > > On Wed, Jun 29, 2016 at 6:44 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Yes, they are related in the sense that if we always materialize a source >> KTable, then we can completely replace the `sendOldValues` as it will >> always be true. But since 3911 is a rather big change, I'd prefer to >> complete this ticket first, and refactor it when we decided to work on >> 3911 >> later. >> >> Feel free to link these two tickets though. >> >> Guozhang >> >> On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome <phder...@gmail.com> >> wrote: >> >> > Is this point of view consistent with new ticket 3911 (Enforce KTable >> > materialisation ) just submitted by Eno. T? >> > >> > Should the two tickets be linked somehow if they are related? >> > My concern is that, the overhead of requesting the source KTable to be >> > materialized (i.e. creating a state store, and sending the {old -> new} >> > pair instead of the new value only) may be over-whelming compared with >> its >> > potential benefits of reducing the downstream traffic. >> > >> > Guozhang >> > >> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome <phder...@gmail.com> >> > wrote: >> > >> > > Guozhang, >> > > >> > > would you say it's advisable to initialize KTableFilter.sendOldValues >> to >> > > true instead of false? That's what I see that can trigger your >> described >> > > case 3 to potentially desirable effect, but I didn't include it into >> pull >> > > request. If left to default value of false, I don't know what >> mechanism >> > > should override it to true. >> > > >> > > Phil >> > > >> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > >> > > > Thanks! You can follow this step-by-step guidance to contribute to >> > Kafka >> > > > via github. >> > > > >> > > > >> > > > >> > > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest >> > > > >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome < >> phder...@gmail.com> >> > > > wrote: >> > > > >> > > > > I have a 1 liner solution for this in KTableFilter.java and about >> 5-6 >> > > > lines >> > > > > changes to existing unit test >> KTableFilterTest.testSendingOldValue. I >> > > > > included those lines with context in the JIRA. I am struggling a >> bit >> > > with >> > > > > github being new to it and how to do a proper pull request so >> > hopefully >> > > > > that can be followed up by you? I had the streams test suite pass >> > aside >> > > > for >> > > > > a few cases that pertain specifically to this JIRA as assumptions >> > have >> > > > now >> > > > > changed. >> > > > > >> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang < >> wangg...@gmail.com> >> > > > wrote: >> > > > > >> > > > > > Hi Philippe, >> > > > > > >> > > > > > Great, since you agree with my reasonings, I have created a JIRA >> > > ticket >> > > > > for >> > > > > > optimizing KTableFilter (feel free to pick it up if you are >> > > interested >> > > > in >> > > > > > contributing): >> > > > > > >> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902 >> > > > > > >> > > > > > About case 3-c-1), what I meant is that since "predicate return >> > true >> > > on >> > > > > > both", >> > > > > > the resulted pair would just be the same as the original pair. >> > > > > > >> > > > > > About KIP-63, itself is a rather big story, but it has one >> > > > correspondence >> > > > > > to this JIRA: with caching you can dedup some records with the >> same >> > > > key, >> > > > > > for example in the input records to the KTable is: >> > > > > > >> > > > > > <a: 1>, <a: 2>, <a: 3>, <a: 4>, <a: 5>, <a: 6> ... >> > > > > > >> > > > > > And the KTable is materialized into a state store with cache on >> top >> > > of >> > > > > it, >> > > > > > then the resulted downstream could be: >> > > > > > >> > > > > > <a: {null -> 1}>, <a: {1 -> 6}> ... >> > > > > > >> > > > > > Instead of >> > > > > > >> > > > > > <a: {null -> 1}>, <a: {1 -> 2}>, <a: {2 -> 3}>, ... <a: {5 -> >> 6}> >> > ... >> > > > > > >> > > > > > So if it is piped to a filter() operator, then even less data >> will >> > be >> > > > > > produced. >> > > > > > >> > > > > > >> > > > > > Guozhang >> > > > > > >> > > > > > >> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome < >> > phder...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Yes, it looks very good. Your detailed explanation appears >> > > compelling >> > > > > > > enough to reveal that some of the details of the complexity >> of a >> > > > > streams >> > > > > > > system are probably inherent complexity (not that I dared >> assume >> > it >> > > > was >> > > > > > > "easy" but I could afford to be conveniently unaware). It >> took me >> > > 30 >> > > > > > > minutes to grasp this latest response. >> > > > > > > >> > > > > > > There might be a typo in your email for case 3.c.1) as I would >> > > think >> > > > we >> > > > > > > should send the most recent pair as opposed to original, in >> any >> > > event >> > > > > it >> > > > > > > does not materially impact your presentation. >> > > > > > > >> > > > > > > Your case 3a) is really what triggered my line of questioning >> and >> > I >> > > > > found >> > > > > > > the current behaviour vexing as it may lead to some >> undesirable >> > and >> > > > > > > necessary filter (see Michael G. Noll's fix in >> > > > UserRegionLambdaExample >> > > > > at >> > > > > > > the very end trying to weed out null) used to output to topic >> to >> > > > > console. >> > > > > > > Without looking at design, it seemed self-evident to me that >> the >> > > 3a) >> > > > > > > behaviour had to be implemented ( from my point of view with >> the >> > > code >> > > > > > > example I was looking at, it simply means never say to delete >> a >> > key >> > > > > that >> > > > > > > was never created, simply don't "create a deleted" key). >> > > > > > > >> > > > > > > Likewise cases 3 b,c look very reasonable. >> > > > > > > >> > > > > > > Just out of curiosity, did you effectively just restate the >> > essence >> > > > of >> > > > > > > KIP-63 in a more approachable language I could understand or >> is >> > > > KIP-63 >> > > > > > > really a different beast? >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang < >> > wangg...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hello Philippe, >> > > > > > > > >> > > > > > > > Very good points, let me dump my thoughts about >> "KTable.filter" >> > > > > > > > specifically and how we can improve on that: >> > > > > > > > >> > > > > > > > 1. Some context: when a KTable participates in a downstream >> > > > operators >> > > > > > > (e.g. >> > > > > > > > if that operator is an aggregation), then we need to >> > materialize >> > > > this >> > > > > > > > KTable and send both its old value as well as new value as a >> > pair >> > > > > {old >> > > > > > -> >> > > > > > > > new} to the downstream operator. In practice it usually >> needs >> > to >> > > > send >> > > > > > the >> > > > > > > > pair. >> > > > > > > > >> > > > > > > > So let's discuss about them separately, take the following >> > > example >> > > > > > source >> > > > > > > > stream for your KTable >> > > > > > > > >> > > > > > > > <a: 1>, <b: 2>, <a: 3> ... >> > > > > > > > >> > > > > > > > When the KTable needs to be materialized, it will transform >> the >> > > > > source >> > > > > > > > messages into the pairs of: >> > > > > > > > >> > > > > > > > <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}> >> > > > > > > > >> > > > > > > > 2. If "send old value" is not enabled, then when the filter >> > > > predicate >> > > > > > > > returns false, we MUST send a <key: null> to the downstream >> > > > operator >> > > > > to >> > > > > > > > indicate that this key is being filtered in the table. >> > Otherwise, >> > > > for >> > > > > > > > example if your filter is "value < 2", then the updated >> value >> > <a: >> > > > 3> >> > > > > > will >> > > > > > > > just be filtered, resulting in incorrect semantics. >> > > > > > > > >> > > > > > > > If it returns true we should still send the original <key: >> > value> >> > > > to >> > > > > > > > downstream operators. >> > > > > > > > >> > > > > > > > 3. If "send old value" is enabled, then there are a couple >> of >> > > cases >> > > > > we >> > > > > > > can >> > > > > > > > consider: >> > > > > > > > >> > > > > > > > a. If old value is <key: null> and new value is <key: >> > > > not-null>, >> > > > > > and >> > > > > > > > the filter predicate return false for the new value, then in >> > this >> > > > > case >> > > > > > it >> > > > > > > > is safe to optimize and not returning anything to the >> > downstream >> > > > > > > operator, >> > > > > > > > since in this case we know there is no value for the key >> > > previously >> > > > > > > > anyways; otherwise we send the original pair. >> > > > > > > > >> > > > > > > > b. If old value is <key: not-null> and new value is >> <key: >> > > > null>, >> > > > > > > > indicating to delete this key, and the filter predicate >> return >> > > > false >> > > > > > for >> > > > > > > > the old value, then in this case it is safe to optimize and >> not >> > > > > > returning >> > > > > > > > anything to the downstream operator, since we know that the >> old >> > > > value >> > > > > > has >> > > > > > > > already been filtered in a previous message; otherwise we >> send >> > > the >> > > > > > > original >> > > > > > > > pair. >> > > > > > > > >> > > > > > > > c. If both old and new values are not null, and: >> > > > > > > > >> > > > > > > > >> > > > > > > > 1) predicate return true on both, send the original pair; >> > > > > > > > >> > > > > > > > 2) predicate return false on both, we can optimize and do >> not >> > > > send >> > > > > > > > anything; >> > > > > > > > >> > > > > > > > 3) predicate return true on old and false on new, send the >> > key: >> > > > > {old >> > > > > > -> >> > > > > > > > null}; >> > > > > > > > >> > > > > > > > 4) predicate return false on old and true on new, send the >> > key: >> > > > > {null >> > > > > > > -> >> > > > > > > > new}; >> > > > > > > > >> > > > > > > > Does this sounds good to you? >> > > > > > > > >> > > > > > > > >> > > > > > > > Guozhang >> > > > > > > > >> > > > > > > > >> > > > > > > > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome < >> > > > phder...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Thanks a lot for the detailed feedback, its clarity and >> the >> > > > > reference >> > > > > > > to >> > > > > > > > > KIP-63, which however is for the most part above my head >> for >> > > now. >> > > > > > > > > >> > > > > > > > > Having said that, I still hold the view that the >> behaviour I >> > > > > > presented >> > > > > > > is >> > > > > > > > > undesirable and hardly defensible and we may have no >> choice >> > but >> > > > to >> > > > > > > agree >> > > > > > > > to >> > > > > > > > > disagree and it could be a sterile discussion to keep at >> it >> > and >> > > > > > > > addressing >> > > > > > > > > KIP-63 and other issues are more important than my brief >> > > > > observation. >> > > > > > > > > >> > > > > > > > > What follows supports my point of view that the filter >> method >> > > is >> > > > > not >> > > > > > > > > behaving as expected and I'd still think it's a defect, >> > > however I >> > > > > am >> > > > > > > > > guarded with my observation admitting my status of "total >> > > newbie" >> > > > > at >> > > > > > > > stream >> > > > > > > > > processing and Kafka. >> > > > > > > > > >> > > > > > > > > if we rewrite the code snippet I provided from >> > > > > > > > > KTable<String, *String*> regionCounts = userRegions >> > > > > > > > > .groupBy((userId, region) -> KeyValue.pair(region, >> > > region)) >> > > > > > > > > .count("CountsByRegion") >> > > > > > > > > .filter((regionName, count) -> false) >> > > > > > > > > .mapValues(count -> count.toString()); >> > > > > > > > > >> > > > > > > > > to >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > KTable<String, Long> regionCounts1 = userRegions >> > > > > > > > > .groupBy((userId, region) -> KeyValue.pair(region, >> > region)) >> > > > > > > > > .count("CountsByRegion"); >> > > > > > > > > >> > > > > > > > > KTable<String, String> regionCounts = regionCounts1 >> > > > > > > > > .filter((regionName, count) -> false) >> > > > > > > > > .mapValues(count -> count.toString()); >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > It becomes clear that regionCounts1 could build up plenty >> of >> > > keys >> > > > > > with >> > > > > > > > > valid Long counts, normal behaviour >> > > > > > > > > >> > > > > > > > > (I think you call this a node in the topology in KIP-63 >> and >> > > > > > > > > regionCounts is a successor node). >> > > > > > > > > >> > > > > > > > > These regionCounts1 keys are then exposed to evaluation of >> > > KTable >> > > > > > > > > regionCounts as an input. But why should there be any key >> > > created >> > > > > in >> > > > > > > > > KTable regionCounts that has a false filter? In other >> words, >> > > the >> > > > > > > > > "optimization" >> > > > > > > > > >> > > > > > > > > seems really compelling here: do not create a key before >> that >> > > key >> > > > > > > > > becomes relevant. The key with a null value is valid and >> > > relevant >> > > > > in >> > > > > > > > > regionCounts1 but not regionCounts. By a programming >> > > composition >> > > > > > > > > argument, the original block >> > > > > > > > > >> > > > > > > > > of code I presented should be equivalent to the broken >> down >> > one >> > > > in >> > > > > > two >> > > > > > > > > blocks here (and I guess that's saying 1 unified node in >> the >> > > > > topology >> > > > > > > > > should be equivalent to a chain of 2 nodes represented >> below >> > > if I >> > > > > > > > > understand the terminology right). >> > > > > > > > > >> > > > > > > > > The contents of regionCounts should not change depending >> on >> > the >> > > > set >> > > > > > of >> > > > > > > > > keys present in regionCounts1 if we view this >> > > > > > > > > >> > > > > > > > > from a functional programming point of view (it's as if we >> > are >> > > > > > > > > carrying garbage collected objects into regionCounts), >> which >> > > > seems >> > > > > > > > > natural considering the method filter that is pervasive in >> > FP. >> > > > > > > > > >> > > > > > > > > Here regionCounts is totally oblivious that aggregation >> took >> > > > place >> > > > > > > > > previously in regionCounts1 and that's fine (KIP-63 talks >> > much >> > > > > about >> > > > > > > > > aggregation but I don't really care about, I care about >> the >> > 2nd >> > > > > node >> > > > > > > > > and the behaviour of filter). >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang < >> > > > wangg...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hello Philippe, >> > > > > > > > > > >> > > > > > > > > > I think your question is really in two-folds: >> > > > > > > > > > >> > > > > > > > > > 1. What is the semantic difference between a KTable and >> a >> > > > > KStream, >> > > > > > > and >> > > > > > > > > more >> > > > > > > > > > specifically how should we interpret (key, null) in >> KTable? >> > > > > > > > > > >> > > > > > > > > > You can find some explanations in this documentation: >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream >> > > > > > > > > > >> > > > > > > > > > Note that KTable itself is still a stream behind the >> scene, >> > > > > > although >> > > > > > > it >> > > > > > > > > may >> > > > > > > > > > be materialized when necessary. And specifically to your >> > > > > question, >> > > > > > > > (key, >> > > > > > > > > > null) can be treated as a tombstone on the specified >> key, >> > and >> > > > > when >> > > > > > > this >> > > > > > > > > > KTable stream is materialized, it will result in a >> "delete" >> > > on >> > > > > > > > > materialized >> > > > > > > > > > view. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > 2. As for the "filter" operator, yes it will generate a >> > large >> > > > > > amount >> > > > > > > of >> > > > > > > > > > (key, null) records which indicates "delete" in the >> > resulted >> > > > > > KTable, >> > > > > > > > and >> > > > > > > > > > hence large traffic to the piped topic. But we are >> working >> > on >> > > > > > KIP-63 >> > > > > > > > > which >> > > > > > > > > > unifies the caching mechanism in the `KTable.to` >> operator >> > as >> > > > well >> > > > > > so >> > > > > > > > that >> > > > > > > > > > de-duping can be done in this operator and hence the >> > outgoing >> > > > > > traffic >> > > > > > > > can >> > > > > > > > > > be largely reduced: >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Guozhang >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome < >> > > > > > phder...@gmail.com >> > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > I made a modification of latest Confluent's example >> > > > > > > > > > > UserRegionLambdaExample. See relevant code at end of >> > email. >> > > > > > > > > > > >> > > > > > > > > > > Am I correct in understanding that KTable semantics >> > should >> > > be >> > > > > > > similar >> > > > > > > > > to >> > > > > > > > > > a >> > > > > > > > > > > store-backed cache of a view as (per wikipedia on >> > > > materialized >> > > > > > > views) >> > > > > > > > > or >> > > > > > > > > > > similar to Oracle's materialized views and indexed >> views? >> > > > More >> > > > > > > > > > > specifically, I am looking at when a (key, null value) >> > pair >> > > > can >> > > > > > > make >> > > > > > > > it >> > > > > > > > > > > into KTable on generating table from a valid KStream >> with >> > a >> > > > > false >> > > > > > > > > filter. >> > > > > > > > > > > >> > > > > > > > > > > Here's relevant code modified from example for which I >> > > > observed >> > > > > > > that >> > > > > > > > > all >> > > > > > > > > > > keys within userRegions are sent out to topic >> > LargeRegions >> > > > > with a >> > > > > > > > null >> > > > > > > > > > > value. I would think that both regionCounts KTable and >> > > topic >> > > > > > > > > LargeRegions >> > > > > > > > > > > should be empty so that the cached view agrees with >> the >> > > > > intended >> > > > > > > > query >> > > > > > > > > (a >> > > > > > > > > > > query with an intentional empty result set as the >> filter >> > is >> > > > > > > > > intentionally >> > > > > > > > > > > false as 1 >= 2). >> > > > > > > > > > > >> > > > > > > > > > > I am not sure I understand implications properly as I >> am >> > > new >> > > > > but >> > > > > > it >> > > > > > > > > seems >> > > > > > > > > > > possible that a highly selective filter from a large >> > > > incoming >> > > > > > > stream >> > > > > > > > > > would >> > > > > > > > > > > result in high memory usage for regionCounts and hence >> > the >> > > > > stream >> > > > > > > > > > > application. >> > > > > > > > > > > >> > > > > > > > > > > KTable<String, *String*> regionCounts = userRegions >> > > > > > > > > > > // Count by region >> > > > > > > > > > > // We do not need to specify any explicit serdes >> > > because >> > > > > the >> > > > > > > key >> > > > > > > > > > > and value types do not change >> > > > > > > > > > > .groupBy((userId, region) -> KeyValue.pair(region, >> > > > region)) >> > > > > > > > > > > .count("CountsByRegion") >> > > > > > > > > > > // discard any regions FOR SAKE OF EXAMPLE >> > > > > > > > > > > .filter((regionName, count) -> *1 >= 2*) >> > > > > > > > > > > .mapValues(count -> count.toString()); >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > KStream<String, *String*> regionCountsForConsole = >> > > > > > > > > > regionCounts.toStream(); >> > > > > > > > > > > >> > > > > > > > > > > regionCountsForConsole.to(stringSerde, *stringSerde*, >> > > > > > > > "LargeRegions"); >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > -- >> > > > > > > > > > -- Guozhang >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > -- >> > > > > > > > -- Guozhang >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > -- Guozhang >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> >> >> >> -- >> -- Guozhang >> > >