Oh ok, I had assumed that distinguishing cases 1 and 2 was possible. Thanks. On 7 Jul 2016 1:02 p.m., "Guozhang Wang" <wangg...@gmail.com> wrote:
> Hello, > > The problem is that, in stream processing when you have a new record coming > from the big table tB and does not find a match with the small table (i.e. > only a sub set of keys in practice) in tA, you cannot tell which case of > the following is true: > > 1. there was a matching record in tA, which gets deleted before this tB > record arrives. In this case we need to output a null record indicating to > "negate" the previous join result. > 2. there was never a matching record in tA. In this case we actually do not > need to output anything. > > As I mentioned, the root cause is that tables in stream processing can be > updated while it is on-going. As for Kafka Streams implementation, the if a > null record is received in the downstream operators while there is no > existing record for that key in the materialized view, then it is treated > as a no-op. > > > Guozhang > > > On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome <phder...@gmail.com> > wrote: > > > thanks, I understand that it's not modelled the same way as database > joins. > > > > Let's take an example of inner join of very small population set (NBA > > rookies or US senators) with larger table (data on zip codes). Let's > assume > > we want to identify the crime rate of zip codes where current senators > live > > or the median income of zip codes where NBA rookies currently live. These > > small elite population samples will likely never live in the 50% poorest > > zip codes in US (although exceptionally some might) and NBA rookies will > > not live far from their team home base (Maine, Alaska, Hawaii, North > > Dakota) so many zip codes will not match and are expected to never match. > > So, I don't see that the keys representing such zip codes will become > > eventually consistent. > > > > One can imagine an application that makes case of census data (with many > > zip codes) and interested in many such statistics for several such small > > "elite" populations and then the irrelevant zip codes with null records > > find their ways multiple times in the data pipeline. > > > > I cannot think of a more egregious example where one table has billions > of > > keys and the other only a handful that would match but I'd assume that > such > > use cases could be natural. > > > > It seems to me that the null keys should be output to represent a record > > deletion in the resulting table, but not a near miss on data selection. > > > > On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello, > > > > > > The KTable join semantics is not exactly the same with that of a RDBMS. > > You > > > can fine detailed semantics in the web docs (search for Joining > Streams): > > > > > > > > > > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl > > > > > > In a nutshell, the joiner will be triggered only if both / left / > either > > of > > > the joining streams has the matching record with the key of the > incoming > > > received record (so the input values of the joiner could not be null / > > can > > > be null for only the other value / can be null on either values, but > not > > > both), and otherwise a pair of {join-key, null} is output. We made this > > > design deliberately just to make sure that "table-table joins are > > > eventually consistent". This gives a kind of resilience to late arrival > > of > > > records that a late arrival in either stream can "update" the join > > result. > > > > > > > > > Guozhang > > > > > > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome <phder...@gmail.com> > > > wrote: > > > > > > > Same happens for regular join, keys that appear only in one stream > will > > > > make it to output KTable tC with a null for either input stream. I > > guess > > > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella > > > JIRA > > > > 3909, Queryable state for Kafka Streams? > > > > > > > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome <phder...@gmail.com> > > > > wrote: > > > > > > > > > If we have two streams A and B for which we associate tables tA and > > tB, > > > > > then create a table tC as ta.leftJoin(tB, <some value joiner>) and > > then > > > > we > > > > > have a key kB in stream B but never made it to tA nor tC, do we > need > > to > > > > > inject a pair (k,v) of (kB, null) into resulting change log for tC > ? > > > > > > > > > > It sounds like it is definitely necessary if key kB is present in > > table > > > > tC > > > > > but if not, why add it? > > > > > > > > > > I have an example that reproduces this and would like to know if it > > is > > > > > considered normal, sub-optimal, or a defect. I don't view it as > > normal > > > > for > > > > > time being, particularly considering stream A as having very few > keys > > > > and B > > > > > as having many, which could lead to an unnecessary large change log > > for > > > > C. > > > > > > > > > > Phil > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >