Hi John & Guozhang @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the KIP, pruned much of what I wrote and put a simplified diagram near the top to illustrate the workflow. I encapsulated Jan's content at the bottom of the document. I believe it is simpler to read by far now.
@Guozhang Wang <wangg...@gmail.com>: > #1: rekey left table > -> source from the left upstream, send to rekey-processor to generate combined key, and then sink to copartition topic. Correct. > #2: first-join with right table > -> source from the right table upstream, materialize the right table. > -> source from the co-partition topic, materialize the rekeyed left table, join with the right table, rekey back, and then sink to the rekeyed-back topic. Almost - I cleared up the KIP. We do not rekey back yet, as I need the Foreign-Key value generated in #1 above to compare in the resolution stage. > #3: second join > -> source from the rekeyed-back topic, materialize the rekeyed back table. > -> source from the left upstream, materialize the left table, join with the rekeyed back table. Almost - As each event comes in, we just run it through a stateful processor that checks the original ("This") KTable for the key. The value payload then has the foreignKeyExtractor applied again as in Part #1 above, and gets the current foreign key. Then we compare it to the joined event that we are currently resolving. If they have the same foreign-key, propagate the result out. If they don't, throw the event away. The end result is that we do need to materialize 2 additional tables (left/this-combinedkey table, and the final Joined table) as I've illustrated in the updated KIP. I hope the diagram clears it up a lot better. Please let me know. Thanks again Adam On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote: > John, > > Thanks a lot for the suggestions on refactoring the wiki, I agree with you > that we should consider the KIP proposal to be easily understood by anyone > in the future to read, and hence should provide a good summary on the > user-facing interfaces, as well as rejected alternatives to represent > briefly "how we came a long way to this conclusion, and what we have > argued, disagreed, and agreed about, etc" so that readers do not need to > dig into the DISCUSS thread to get all the details. We can, of course, keep > the implementation details like "workflows" on the wiki page as a addendum > section since it also has correlations. > > Regarding your proposal on comment 6): that's a very interesting idea! Just > to clarify that I understands it fully correctly: the proposal's resulted > topology is still the same as the current proposal, where we will have 3 > sub-topologies for this operator: > > #1: rekey left table > -> source from the left upstream, send to rekey-processor to generate > combined key, and then sink to copartition topic. > > #2: first-join with right table > -> source from the right table upstream, materialize the right table. > -> source from the co-partition topic, materialize the rekeyed left > table, join with the right table, rekey back, and then sink to the > rekeyed-back topic. > > #3: second join > -> source from the rekeyed-back topic, materialize the rekeyed back > table. > -> source from the left upstream, materialize the left table, join with > the rekeyed back table. > > Sub-topology #1 and #3 may be merged to a single sub-topology since both of > them read from the left table source stream. In this workflow, we need to > materialize 4 tables (left table in #3, right table in #2, rekeyed left > table in #2, rekeyed-back table in #3), and 2 repartition topics > (copartition topic, rekeyed-back topic). > > Compared with Adam's current proposal in the workflow overview, it has the > same num.materialize tables (left table, rekeyed left table, right table, > out-of-ordering resolver table), and same num.internal topics (two). The > advantage is that on the copartition topic, we can save bandwidth by not > sending value, and in #2 the rekeyed left table is smaller since we do not > have any values to materialize. Is that right? > > > Guozhang > > > > On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io> wrote: > > > Hi Adam, > > > > Given that the committers are all pretty busy right now, I think that it > > would help if you were to refactor the KIP a little to reduce the > workload > > for reviewers. > > > > I'd recommend the following changes: > > * relocate all internal details to a section at the end called something > > like "Implementation Notes" or something like that. > > * rewrite the rest of the KIP to be a succinct as possible and mention > only > > publicly-facing API changes. > > ** for example, the interface that you've already listed there, as well > as > > a textual description of the guarantees we'll be providing (join result > is > > copartitioned with the LHS, and the join result is guaranteed correct) > > > > A good target would be that the whole main body of the KIP, including > > Status, Motivation, Proposal, Justification, and Rejected Alternatives > all > > fit "above the fold" (i.e., all fit on the screen at a comfortable zoom > > level). > > I think the only real Rejected Alternative that bears mention at this > point > > is KScatteredTable, which you could just include the executive summary on > > (no implementation details), and link to extra details in the > > Implementation Notes section. > > > > Taking a look at the wiki page, ~90% of the text there is internal > detail, > > which is useful for the dubious, but doesn't need to be ratified in a > vote > > (and would be subject to change without notice in the future anyway). > > There's also a lot of conflicting discussion, as you've very respectfully > > tried to preserve the original proposal from Jan while adding your own. > > Isolating all this information in a dedicated section at the bottom frees > > the voters up to focus on the public API part of the proposal, which is > > really all they need to consider. > > > > Plus, it'll be clear to future readers which parts of the document are > > enduring, and which parts are a snapshot of our implementation thinking > at > > the time. > > > > I'm suggesting this because I suspect that the others haven't made time > to > > review it partly because it seems daunting. If it seems like it would be > a > > huge time investment to review, people will just keep putting it off. But > > if the KIP is a single page, then they'll be more inclined to give it a > > read. > > > > Honestly, I don't think the KIP itself is that controversial (apart from > > the scattered table thing (sorry, Jan) ). Most of the discussion has been > > around the implementation, which we can continue more effectively in a PR > > once the KIP has passed. > > > > How does that sound? > > -John > > > > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <adam.bellem...@gmail.com > > > > wrote: > > > > > 1) I believe that the resolution mechanism John has proposed is > > sufficient > > > - it is clean and easy and doesn't require additional RocksDB stores, > > which > > > reduces the footprint greatly. I don't think we need to resolve based > on > > > timestamp or offset anymore, but if we decide to do to that would be > > within > > > the bounds of the existing API. > > > > > > 2) Is the current API sufficient, or does it need to be altered to go > > back > > > to vote? > > > > > > 3) KScatteredTable implementation can always be added in a future > > revision. > > > This API does not rule it out. This implementation of this function > would > > > simply be replaced with `KScatteredTable.resolve()` while still > > maintaining > > > the existing API, thereby giving both features as Jan outlined earlier. > > > Would this work? > > > > > > > > > Thanks Guozhang, John and Jan > > > > > > > > > > > > > > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io> > wrote: > > > > > > > Hi, all, > > > > > > > > >> In fact, we > > > > >> can just keep a single final-result store with timestamps and > reject > > > > values > > > > >> that have a smaller timestamp, is that right? > > > > > > > > > Which is the correct output should at least be decided on the > offset > > of > > > > > the original message. > > > > > > > > Thanks for this point, Jan. > > > > > > > > KIP-258 is merely to allow embedding the record timestamp in the k/v > > > > store, > > > > as well as providing a storage-format upgrade path. > > > > > > > > I might have missed it, but I think we have yet to discuss whether > it's > > > > safe > > > > or desirable just to swap topic-ordering our for timestamp-ordering. > > This > > > > is > > > > a very deep topic, and I think it would only pollute the current > > > > discussion. > > > > > > > > What Adam has proposed is safe, given the *current* ordering > semantics > > > > of the system. If we can agree on his proposal, I think we can merge > > the > > > > feature well before the conversation about timestamp ordering even > > takes > > > > place, much less reaches a conclusion. In the mean time, it would > seem > > to > > > > be unfortunate to have one join operator with different ordering > > > semantics > > > > from every other KTable operator. > > > > > > > > If and when that timestamp discussion takes place, many (all?) KTable > > > > operations > > > > will need to be updated, rendering the many:one join a small marginal > > > cost. > > > > > > > > And, just to plug it again, I proposed an algorithm above that I > > believe > > > > provides > > > > correct ordering without any additional metadata, and regardless of > the > > > > ordering semantics. I didn't bring it up further, because I felt the > > KIP > > > > only needs > > > > to agree on the public API, and we can discuss the implementation at > > > > leisure in > > > > a PR... > > > > > > > > Thanks, > > > > -John > > > > > > > > > > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak < > jan.filip...@trivago.com > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > On 10.12.2018 07:42, Guozhang Wang wrote: > > > > > > Hello Adam / Jan / John, > > > > > > > > > > > > Sorry for being late on this thread! I've finally got some time > > this > > > > > > weekend to cleanup a load of tasks on my queue (actually I've > also > > > > > realized > > > > > > there are a bunch of other things I need to enqueue while > cleaning > > > them > > > > > up > > > > > > --- sth I need to improve on my side). So here are my thoughts: > > > > > > > > > > > > Regarding the APIs: I like the current written API in the KIP. > More > > > > > > generally I'd prefer to keep the 1) one-to-many join > > functionalities > > > as > > > > > > well as 2) other join types than inner as separate KIPs since 1) > > may > > > > > worth > > > > > > a general API refactoring that can benefit not only foreignkey > > joins > > > > but > > > > > > collocate joins as well (e.g. an extended proposal of > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup > > > > > ), > > > > > > and I'm not sure if other join types would actually be needed > > (maybe > > > > left > > > > > > join still makes sense), so it's better to > > > > wait-for-people-to-ask-and-add > > > > > > than add-sth-that-no-one-uses. > > > > > > > > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing a > > > > > > KScatteredTable for users to inject their own optimization: I'd > > > prefer > > > > to > > > > > > do the current option as-is, and my main rationale is for > > > optimization > > > > > > rooms inside the Streams internals and the API succinctness. For > > > > advanced > > > > > > users who may indeed prefer KScatteredTable and do their own > > > > > optimization, > > > > > > while it is too much of the work to use Processor API directly, I > > > think > > > > > we > > > > > > can still extend the current API to support it in the future if > it > > > > > becomes > > > > > > necessary. > > > > > > > > > > no internal optimization potential. it's a myth > > > > > > > > > > ¯\_(ツ)_/¯ > > > > > > > > > > :-) > > > > > > > > > > > > > > > > > Another note about step 4) resolving out-of-ordering data, as I > > > > mentioned > > > > > > before I think with KIP-258 (embedded timestamp with key-value > > store) > > > > we > > > > > > can actually make this step simpler than the current proposal. In > > > fact, > > > > > we > > > > > > can just keep a single final-result store with timestamps and > > reject > > > > > values > > > > > > that have a smaller timestamp, is that right? > > > > > > > > > > Which is the correct output should at least be decided on the > offset > > of > > > > > the original message. > > > > > > > > > > > > > > > > > > > > > > > That's all I have in mind now. Again, great appreciation to Adam > to > > > > make > > > > > > such HUGE progress on this KIP! > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak < > > > jan.filip...@trivago.com> > > > > > > wrote: > > > > > > > > > > > >> If they don't find the time: > > > > > >> They usually take the opposite path from me :D > > > > > >> so the answer would be clear. > > > > > >> > > > > > >> hence my suggestion to vote. > > > > > >> > > > > > >> > > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote: > > > > > >>> Hi Guozhang and Matthias > > > > > >>> > > > > > >>> I know both of you are quite busy, but we've gotten this KIP > to a > > > > point > > > > > >>> where we need more guidance on the API (perhaps a bit of a > > > > tie-breaker, > > > > > >> if > > > > > >>> you will). If you have anyone else you may think should look at > > > this, > > > > > >>> please tag them accordingly. > > > > > >>> > > > > > >>> The scenario is as such: > > > > > >>> > > > > > >>> Current Option: > > > > > >>> API: > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces > > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the > > partition > > > > with > > > > > >> the > > > > > >>> foreignKey (repartition 1) > > > > > >>> 2) Join the data > > > > > >>> 3) Shuffle the data back to the original node (repartition 2) > > > > > >>> 4) Resolve out-of-order arrival / race condition due to > > foreign-key > > > > > >> changes. > > > > > >>> > > > > > >>> Alternate Option: > > > > > >>> Perform #1 and #2 above, and return a KScatteredTable. > > > > > >>> - It would be keyed on a wrapped key function: <CombinedKey<KO, > > K>, > > > > VR> > > > > > >> (KO > > > > > >>> = Other Table Key, K = This Table Key, VR = Joined Result) > > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but > > otherwise a > > > > > user > > > > > >>> would be able to perform additional functions directly from the > > > > > >>> KScatteredTable (TBD - currently out of scope). > > > > > >>> - John's analysis 2-emails up is accurate as to the tradeoffs. > > > > > >>> > > > > > >>> Current Option is coded as-is. Alternate option is possible, > but > > > will > > > > > >>> require for implementation details to be made in the API and > some > > > > > >> exposure > > > > > >>> of new data structures into the API (ie: CombinedKey). > > > > > >>> > > > > > >>> I appreciate any insight into this. > > > > > >>> > > > > > >>> Thanks. > > > > > >>> > > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare < > > > > > adam.bellem...@gmail.com> > > > > > >>> wrote: > > > > > >>> > > > > > >>>> Hi John > > > > > >>>> > > > > > >>>> Thanks for your feedback and assistance. I think your summary > is > > > > > >> accurate > > > > > >>>> from my perspective. Additionally, I would like to add that > > there > > > > is a > > > > > >> risk > > > > > >>>> of inconsistent final states without performing the > resolution. > > > This > > > > > is > > > > > >> a > > > > > >>>> major concern for me as most of the data I have dealt with is > > > > produced > > > > > >> by > > > > > >>>> relational databases. We have seen a number of cases where a > > user > > > in > > > > > the > > > > > >>>> Rails UI has modified the field (foreign key), realized they > > made > > > a > > > > > >>>> mistake, and then updated the field again with a new key. The > > > events > > > > > are > > > > > >>>> propagated out as they are produced, and as such we have had > > > > > real-world > > > > > >>>> cases where these inconsistencies were propagated downstream > as > > > the > > > > > >> final > > > > > >>>> values due to the race conditions in the fanout of the data. > > > > > >>>> > > > > > >>>> This solution that I propose values correctness of the final > > > result > > > > > over > > > > > >>>> other factors. > > > > > >>>> > > > > > >>>> We could always move this function over to using a > > KScatteredTable > > > > > >>>> implementation in the future, and simply deprecate it this > join > > > API > > > > in > > > > > >>>> time. I think I would like to hear more from some of the other > > > major > > > > > >>>> committers on which course of action they would think is best > > > before > > > > > any > > > > > >>>> more coding is done. > > > > > >>>> > > > > > >>>> Thanks again > > > > > >>>> > > > > > >>>> Adam > > > > > >>>> > > > > > >>>> > > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler < > j...@confluent.io> > > > > > wrote: > > > > > >>>> > > > > > >>>>> Hi Jan and Adam, > > > > > >>>>> > > > > > >>>>> Wow, thanks for doing that test, Adam. Those results are > > > > encouraging. > > > > > >>>>> > > > > > >>>>> Thanks for your performance experience as well, Jan. I agree > > that > > > > > >> avoiding > > > > > >>>>> unnecessary join outputs is especially important when the > > fan-out > > > > is > > > > > so > > > > > >>>>> high. I suppose this could also be built into the > > implementation > > > > > we're > > > > > >>>>> discussing, but it wouldn't have to be specified in the KIP > > > (since > > > > > >> it's an > > > > > >>>>> API-transparent optimization). > > > > > >>>>> > > > > > >>>>> As far as whether or not to re-repartition the data, I didn't > > > bring > > > > > it > > > > > >> up > > > > > >>>>> because it sounded like the two of you agreed to leave the > KIP > > > > as-is, > > > > > >>>>> despite the disagreement. > > > > > >>>>> > > > > > >>>>> If you want my opinion, I feel like both approaches are > > > reasonable. > > > > > >>>>> It sounds like Jan values more the potential for developers > to > > > > > optimize > > > > > >>>>> their topologies to re-use the intermediate nodes, whereas > Adam > > > > > places > > > > > >>>>> more > > > > > >>>>> value on having a single operator that people can use without > > > extra > > > > > >> steps > > > > > >>>>> at the end. > > > > > >>>>> > > > > > >>>>> Personally, although I do find it exceptionally annoying > when a > > > > > >> framework > > > > > >>>>> gets in my way when I'm trying to optimize something, it > seems > > > > better > > > > > >> to > > > > > >>>>> go > > > > > >>>>> for a single operation. > > > > > >>>>> * Encapsulating the internal transitions gives us significant > > > > > latitude > > > > > >> in > > > > > >>>>> the implementation (for example, joining only at the end, not > > in > > > > the > > > > > >>>>> middle > > > > > >>>>> to avoid extra data copying and out-of-order resolution; how > we > > > > > >> represent > > > > > >>>>> the first repartition keys (combined keys vs. value vectors), > > > > etc.). > > > > > >> If we > > > > > >>>>> publish something like a KScatteredTable with the > > > right-partitioned > > > > > >> joined > > > > > >>>>> data, then the API pretty much locks in the implementation as > > > well. > > > > > >>>>> * The API seems simpler to understand and use. I do mean > > "seems"; > > > > if > > > > > >>>>> anyone > > > > > >>>>> wants to make the case that KScatteredTable is actually > > simpler, > > > I > > > > > >> think > > > > > >>>>> hypothetical usage code would help. From a relational algebra > > > > > >> perspective, > > > > > >>>>> it seems like KTable.join(KTable) should produce a new KTable > > in > > > > all > > > > > >>>>> cases. > > > > > >>>>> * That said, there might still be room in the API for a > > different > > > > > >>>>> operation > > > > > >>>>> like what Jan has proposed to scatter a KTable, and then do > > > things > > > > > like > > > > > >>>>> join, re-group, etc from there... I'm not sure; I haven't > > thought > > > > > >> through > > > > > >>>>> all the consequences yet. > > > > > >>>>> > > > > > >>>>> This is all just my opinion after thinking over the > discussion > > so > > > > > >> far... > > > > > >>>>> -John > > > > > >>>>> > > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare < > > > > > >> adam.bellem...@gmail.com> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > >>>>>> Updated the PR to take into account John's feedback. > > > > > >>>>>> > > > > > >>>>>> I did some preliminary testing for the performance of the > > > > > prefixScan. > > > > > >> I > > > > > >>>>>> have attached the file, but I will also include the text in > > the > > > > body > > > > > >>>>> here > > > > > >>>>>> for archival purposes (I am not sure what happens to > attached > > > > > files). > > > > > >> I > > > > > >>>>>> also updated the PR and the KIP accordingly. > > > > > >>>>>> > > > > > >>>>>> Summary: It scales exceptionally well for scanning large > > values > > > of > > > > > >>>>>> records. As Jan mentioned previously, the real issue would > be > > > more > > > > > >>>>> around > > > > > >>>>>> processing the resulting records after obtaining them. For > > > > instance, > > > > > >> it > > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer and a > > further > > > > > >>>>> ~35-85mS > > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of them. > > > > Iterating > > > > > >>>>>> through the records just to generate a simple count takes ~ > 40 > > > > times > > > > > >>>>> longer > > > > > >>>>>> than the flush + scan combined. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > ============================================================================================ > > > > > >>>>>> Setup: > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > ============================================================================================ > > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap > > (Xmx512m, > > > > > >> Xms512m) > > > > > >>>>>> CPU: i7 2.2 Ghz. > > > > > >>>>>> > > > > > >>>>>> Note: I am using a slightly-modified, directly-accessible > > Kafka > > > > > >> Streams > > > > > >>>>>> RocksDB > > > > > >>>>>> implementation (RocksDB.java, basically just avoiding the > > > > > >>>>>> ProcessorContext). > > > > > >>>>>> There are no modifications to the default RocksDB values > > > provided > > > > in > > > > > >> the > > > > > >>>>>> 2.1/trunk release. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> keysize = 128 bytes > > > > > >>>>>> valsize = 512 bytes > > > > > >>>>>> > > > > > >>>>>> Step 1: > > > > > >>>>>> Write X positive matching events: (key = prefix + > left-padded > > > > > >>>>>> auto-incrementing integer) > > > > > >>>>>> Step 2: > > > > > >>>>>> Write 10X negative matching events (key = left-padded > > > > > >> auto-incrementing > > > > > >>>>>> integer) > > > > > >>>>>> Step 3: > > > > > >>>>>> Perform flush > > > > > >>>>>> Step 4: > > > > > >>>>>> Perform prefixScan > > > > > >>>>>> Step 5: > > > > > >>>>>> Iterate through return Iterator and validate the count of > > > expected > > > > > >>>>> events. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > ============================================================================================ > > > > > >>>>>> Results: > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > ============================================================================================ > > > > > >>>>>> X = 1k (11k events total) > > > > > >>>>>> Flush Time = 39 mS > > > > > >>>>>> Scan Time = 7 mS > > > > > >>>>>> 6.9 MB disk > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > -------------------------------------------------------------------------------------------- > > > > > >>>>>> X = 10k (110k events total) > > > > > >>>>>> Flush Time = 45 mS > > > > > >>>>>> Scan Time = 8 mS > > > > > >>>>>> 127 MB > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > -------------------------------------------------------------------------------------------- > > > > > >>>>>> X = 100k (1.1M events total) > > > > > >>>>>> Test1: > > > > > >>>>>> Flush Time = 60 mS > > > > > >>>>>> Scan Time = 12 mS > > > > > >>>>>> 678 MB > > > > > >>>>>> > > > > > >>>>>> Test2: > > > > > >>>>>> Flush Time = 45 mS > > > > > >>>>>> Scan Time = 7 mS > > > > > >>>>>> 576 MB > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > -------------------------------------------------------------------------------------------- > > > > > >>>>>> X = 1MB (11M events total) > > > > > >>>>>> Test1: > > > > > >>>>>> Flush Time = 52 mS > > > > > >>>>>> Scan Time = 19 mS > > > > > >>>>>> 7.2 GB > > > > > >>>>>> > > > > > >>>>>> Test2: > > > > > >>>>>> Flush Time = 84 mS > > > > > >>>>>> Scan Time = 34 mS > > > > > >>>>>> 9.1 GB > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > -------------------------------------------------------------------------------------------- > > > > > >>>>>> X = 2.5M (27.5M events total) > > > > > >>>>>> Test1: > > > > > >>>>>> Flush Time = 82 mS > > > > > >>>>>> Scan Time = 63 mS > > > > > >>>>>> 17GB - 276 sst files > > > > > >>>>>> > > > > > >>>>>> Test2: > > > > > >>>>>> Flush Time = 116 mS > > > > > >>>>>> Scan Time = 35 mS > > > > > >>>>>> 23GB - 361 sst files > > > > > >>>>>> > > > > > >>>>>> Test3: > > > > > >>>>>> Flush Time = 103 mS > > > > > >>>>>> Scan Time = 82 mS > > > > > >>>>>> 19 GB - 300 sst files > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > -------------------------------------------------------------------------------------------- > > > > > >>>>>> > > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M events. I > > > tried > > > > > to > > > > > >> go > > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into the > 100GB+ > > > > range > > > > > >>>>> and my > > > > > >>>>>> laptop ran out of disk. More extensive testing could be done > > > but I > > > > > >>>>> suspect > > > > > >>>>>> that it would be in line with what we're seeing in the > results > > > > > above. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> At this point in time, I think the only major discussion > point > > > is > > > > > >> really > > > > > >>>>>> around what Jan and I have disagreed on: repartitioning > back + > > > > > >> resolving > > > > > >>>>>> potential out of order issues or leaving that up to the > client > > > to > > > > > >>>>> handle. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> Thanks folks, > > > > > >>>>>> > > > > > >>>>>> Adam > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak < > > > > > jan.filip...@trivago.com > > > > > >>> > > > > > >>>>>> wrote: > > > > > >>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote: > > > > > >>>>>>>> Hi all, > > > > > >>>>>>>> > > > > > >>>>>>>> Sorry that this discussion petered out... I think the 2.1 > > > > release > > > > > >>>>>>> caused an > > > > > >>>>>>>> extended distraction that pushed it off everyone's radar > > > (which > > > > > was > > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had some > > > extend > > > > > >>>>>>>> distractions of my own that kept (and continue to keep) me > > > > > >>>>> preoccupied. > > > > > >>>>>>>> > > > > > >>>>>>>> However, calling for a vote did wake me up, so I guess Jan > > was > > > > on > > > > > >> the > > > > > >>>>>>> right > > > > > >>>>>>>> track! > > > > > >>>>>>>> > > > > > >>>>>>>> I've gone back and reviewed the whole KIP document and the > > > prior > > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts: > > > > > >>>>>>>> > > > > > >>>>>>>> API Thoughts: > > > > > >>>>>>>> > > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a > many-to-one > > > > join. > > > > > >>>>> Could > > > > > >>>>>>> we > > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip > > the > > > > > design > > > > > >>>>>>> around > > > > > >>>>>>>> and make it a oneToManyJoin? > > > > > >>>>>>>> > > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the join > > type, > > > > and > > > > > it > > > > > >>>>>>> seems > > > > > >>>>>>>> like it might trick some people into using it for a > > one-to-one > > > > > join. > > > > > >>>>>>> This > > > > > >>>>>>>> would work, of course, but it would be super inefficient > > > > compared > > > > > to > > > > > >>>>> a > > > > > >>>>>>>> simple rekey-and-join. > > > > > >>>>>>>> > > > > > >>>>>>>> 2. I might have missed it, but I don't think it's > specified > > > > > whether > > > > > >>>>>>> it's an > > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer join, as > > > > > >>>>> (neglecting > > > > > >>>>>>> IQ), > > > > > >>>>>>>> the rest can be achieved by filtering or by handling it in > > the > > > > > >>>>>>> ValueJoiner. > > > > > >>>>>>>> > > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite > > right. > > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different > > paradigms > > > in > > > > > >>>>> play in > > > > > >>>>>>>> the Streams API, so it's confusing, but instead of three > > > > > Serialized > > > > > >>>>>>> args, I > > > > > >>>>>>>> think it would be better to have one that allows > > (optionally) > > > > > >> setting > > > > > >>>>>>> the 4 > > > > > >>>>>>>> incoming serdes. The result serde is defined by the > > > > Materialized. > > > > > >> The > > > > > >>>>>>>> incoming serdes can be optional because they might already > > be > > > > > >>>>> available > > > > > >>>>>>> on > > > > > >>>>>>>> the source KTables, or the default serdes from the config > > > might > > > > be > > > > > >>>>>>>> applicable. > > > > > >>>>>>>> > > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins > > don't > > > > > allow > > > > > >>>>>>> setting > > > > > >>>>>>>> one, and it seems like it might actually be harmful, since > > the > > > > > rekey > > > > > >>>>>>>> operation needs to produce results that are co-partitioned > > > with > > > > > the > > > > > >>>>>>> "other" > > > > > >>>>>>>> KTable. > > > > > >>>>>>>> > > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I didn't > > > > actually > > > > > >>>>>>> follow > > > > > >>>>>>>> what Matthias meant about namespacing requiring > > > "deserializing" > > > > > the > > > > > >>>>>>> record > > > > > >>>>>>>> header. The headers are already Strings, so I don't think > > that > > > > > >>>>>>>> deserialization is required. If we applied the namespace > at > > > > source > > > > > >>>>> nodes > > > > > >>>>>>>> and stripped it at sink nodes, this would be practically > no > > > > > >> overhead. > > > > > >>>>>>> The > > > > > >>>>>>>> advantage of the namespace idea is that no public API > change > > > wrt > > > > > >>>>> headers > > > > > >>>>>>>> needs to happen, and no restrictions need to be placed on > > > users' > > > > > >>>>>>> headers. > > > > > >>>>>>>> > > > > > >>>>>>>> (Although I'm wondering if we can get away without the > > header > > > at > > > > > >>>>> all... > > > > > >>>>>>>> stay tuned) > > > > > >>>>>>>> > > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM table > > > > growing > > > > > >>>>>>> without > > > > > >>>>>>>> bound. As I read it, the HWM table is effectively > > implementing > > > > OCC > > > > > >> to > > > > > >>>>>>>> resolve the problem you noted with disordering when the > > rekey > > > is > > > > > >>>>>>>> reversed... particularly notable when the FK changes. As > > such, > > > > it > > > > > >>>>> only > > > > > >>>>>>>> needs to track the most recent "version" (the offset in > the > > > > source > > > > > >>>>>>>> partition) of each key. Therefore, it should have the same > > > > number > > > > > of > > > > > >>>>>>> keys > > > > > >>>>>>>> as the source table at all times. > > > > > >>>>>>>> > > > > > >>>>>>>> I see that you are aware of KIP-258, which I think might > be > > > > > relevant > > > > > >>>>> in > > > > > >>>>>>> a > > > > > >>>>>>>> couple of ways. One: it's just about storing the timestamp > > in > > > > the > > > > > >>>>> state > > > > > >>>>>>>> store, but the ultimate idea is to effectively use the > > > timestamp > > > > > as > > > > > >>>>> an > > > > > >>>>>>> OCC > > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't want to > > use > > > > the > > > > > >>>>>>>> timestamp for this operation, but if you were to use a > > similar > > > > > >>>>>>> mechanism to > > > > > >>>>>>>> store the source offset in the store alongside the > re-keyed > > > > > values, > > > > > >>>>> then > > > > > >>>>>>>> you could avoid a separate table. > > > > > >>>>>>>> > > > > > >>>>>>>> 6. You and Jan have been thinking about this for a long > > time, > > > so > > > > > >> I've > > > > > >>>>>>>> probably missed something here, but I'm wondering if we > can > > > > avoid > > > > > >> the > > > > > >>>>>>> HWM > > > > > >>>>>>>> tracking at all and resolve out-of-order during a final > join > > > > > >>>>> instead... > > > > > >>>>>>>> > > > > > >>>>>>>> Let's say we're joining a left table (Integer K: Letter > FK, > > > > (other > > > > > >>>>>>> data)) > > > > > >>>>>>>> to a right table (Letter K: (some data)). > > > > > >>>>>>>> > > > > > >>>>>>>> Left table: > > > > > >>>>>>>> 1: (A, xyz) > > > > > >>>>>>>> 2: (B, asd) > > > > > >>>>>>>> > > > > > >>>>>>>> Right table: > > > > > >>>>>>>> A: EntityA > > > > > >>>>>>>> B: EntityB > > > > > >>>>>>>> > > > > > >>>>>>>> We could do a rekey as you proposed with a combined key, > but > > > not > > > > > >>>>>>>> propagating the value at all.. > > > > > >>>>>>>> Rekey table: > > > > > >>>>>>>> A-1: (dummy value) > > > > > >>>>>>>> B-2: (dummy value) > > > > > >>>>>>>> > > > > > >>>>>>>> Which we then join with the right table to produce: > > > > > >>>>>>>> A-1: EntityA > > > > > >>>>>>>> B-2: EntityB > > > > > >>>>>>>> > > > > > >>>>>>>> Which gets rekeyed back: > > > > > >>>>>>>> 1: A, EntityA > > > > > >>>>>>>> 2: B, EntityB > > > > > >>>>>>>> > > > > > >>>>>>>> And finally we do the actual join: > > > > > >>>>>>>> Result table: > > > > > >>>>>>>> 1: ((A, xyz), EntityA) > > > > > >>>>>>>> 2: ((B, asd), EntityB) > > > > > >>>>>>>> > > > > > >>>>>>>> The thing is that in that last join, we have the > opportunity > > > to > > > > > >>>>> compare > > > > > >>>>>>> the > > > > > >>>>>>>> current FK in the left table with the incoming PK of the > > right > > > > > >>>>> table. If > > > > > >>>>>>>> they don't match, we just drop the event, since it must be > > > > > outdated. > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>>>> In your KIP, you gave an example in which (1: A, xyz) gets > > > > updated > > > > > >> to > > > > > >>>>>>> (1: > > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about whether the > > > final > > > > > >>>>> state > > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the > algorithm > > > > above, > > > > > >>>>> you > > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B, > > xyz), > > > > (B, > > > > > >>>>>>>> EntityB)). It seems like this does give you enough > > information > > > > to > > > > > >>>>> make > > > > > >>>>>>> the > > > > > >>>>>>>> right choice, regardless of disordering. > > > > > >>>>>>> > > > > > >>>>>>> Will check Adams patch, but this should work. As mentioned > > > often > > > > I > > > > > am > > > > > >>>>>>> not convinced on partitioning back for the user > > automatically. > > > I > > > > > >> think > > > > > >>>>>>> this is the real performance eater ;) > > > > > >>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the > > > performance > > > > of > > > > > >>>>> the > > > > > >>>>>>>> range scans when records change in the right table. You've > > > said > > > > > that > > > > > >>>>>>> you've > > > > > >>>>>>>> been using the algorithm you presented in production for a > > > > while. > > > > > >> Can > > > > > >>>>>>> you > > > > > >>>>>>>> give us a sense of the performance characteristics you've > > > > > observed? > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> Make it work, make it fast, make it beautiful. The topmost > > > thing > > > > > here > > > > > >>>>> is > > > > > >>>>>>> / was correctness. In practice I do not measure the > > performance > > > > of > > > > > >> the > > > > > >>>>>>> range scan. Usual cases I run this with is emitting 500k - > > 1kk > > > > rows > > > > > >>>>>>> on a left hand side change. The range scan is just the work > > you > > > > > gotta > > > > > >>>>>>> do, also when you pack your data into different formats, > > > usually > > > > > the > > > > > >>>>>>> rocks performance is very tight to the size of the data and > > we > > > > > can't > > > > > >>>>>>> really change that. It is more important for users to > prevent > > > > > useless > > > > > >>>>>>> updates to begin with. My left hand side is guarded to drop > > > > changes > > > > > >>>>> that > > > > > >>>>>>> are not going to change my join output. > > > > > >>>>>>> > > > > > >>>>>>> usually it's: > > > > > >>>>>>> > > > > > >>>>>>> drop unused fields and then don't forward if > old.equals(new) > > > > > >>>>>>> > > > > > >>>>>>> regarding to the performance of creating an iterator for > > > smaller > > > > > >>>>>>> fanouts, users can still just do a group by first then > > anyways. > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>>> I could only think of one alternative, but I'm not sure if > > > it's > > > > > >>>>> better > > > > > >>>>>>> or > > > > > >>>>>>>> worse... If the first re-key only needs to preserve the > > > original > > > > > >> key, > > > > > >>>>>>> as I > > > > > >>>>>>>> proposed in #6, then we could store a vector of keys in > the > > > > value: > > > > > >>>>>>>> > > > > > >>>>>>>> Left table: > > > > > >>>>>>>> 1: A,... > > > > > >>>>>>>> 2: B,... > > > > > >>>>>>>> 3: A,... > > > > > >>>>>>>> > > > > > >>>>>>>> Gets re-keyed: > > > > > >>>>>>>> A: [1, 3] > > > > > >>>>>>>> B: [2] > > > > > >>>>>>>> > > > > > >>>>>>>> Then, the rhs part of the join would only need a regular > > > > > single-key > > > > > >>>>>>> lookup. > > > > > >>>>>>>> Of course we have to deal with the problem of large > values, > > as > > > > > >>>>> there's > > > > > >>>>>>> no > > > > > >>>>>>>> bound on the number of lhs records that can reference rhs > > > > records. > > > > > >>>>>>> Offhand, > > > > > >>>>>>>> I'd say we could page the values, so when one row is past > > the > > > > > >>>>>>> threshold, we > > > > > >>>>>>>> append the key for the next page. Then in most cases, it > > would > > > > be > > > > > a > > > > > >>>>>>> single > > > > > >>>>>>>> key lookup, but for large fan-out updates, it would be one > > per > > > > > (max > > > > > >>>>>>> value > > > > > >>>>>>>> size)/(avg lhs key size). > > > > > >>>>>>>> > > > > > >>>>>>>> This seems more complex, though... Plus, I think there's > > some > > > > > extra > > > > > >>>>>>>> tracking we'd need to do to know when to emit a > retraction. > > > For > > > > > >>>>> example, > > > > > >>>>>>>> when record 1 is deleted, the re-key table would just have > > (A: > > > > > [3]). > > > > > >>>>>>> Some > > > > > >>>>>>>> kind of tombstone is needed so that the join result for 1 > > can > > > > also > > > > > >> be > > > > > >>>>>>>> retracted. > > > > > >>>>>>>> > > > > > >>>>>>>> That's all! > > > > > >>>>>>>> > > > > > >>>>>>>> Thanks so much to both Adam and Jan for the thoughtful > KIP. > > > > Sorry > > > > > >> the > > > > > >>>>>>>> discussion has been slow. > > > > > >>>>>>>> -John > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak < > > > > > >>>>> jan.filip...@trivago.com> > > > > > >>>>>>>> wrote: > > > > > >>>>>>>> > > > > > >>>>>>>>> Id say you can just call the vote. > > > > > >>>>>>>>> > > > > > >>>>>>>>> that happens all the time, and if something comes up, it > > just > > > > > goes > > > > > >>>>> back > > > > > >>>>>>>>> to discuss. > > > > > >>>>>>>>> > > > > > >>>>>>>>> would not expect to much attention with another another > > email > > > > in > > > > > >>>>> this > > > > > >>>>>>>>> thread. > > > > > >>>>>>>>> > > > > > >>>>>>>>> best Jan > > > > > >>>>>>>>> > > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote: > > > > > >>>>>>>>>> Hello Contributors > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do need > to > > > bump > > > > > >>>>> this to > > > > > >>>>>>>>> keep > > > > > >>>>>>>>>> visibility up. I am still intending to push this through > > > once > > > > > >>>>>>> contributor > > > > > >>>>>>>>>> feedback is given. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Main points that need addressing: > > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current > > singular > > > > > graph > > > > > >>>>> node > > > > > >>>>>>>>> into > > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters right > > now. I > > > > am > > > > > a > > > > > >>>>> bit > > > > > >>>>>>>>> fuzzy > > > > > >>>>>>>>>> on how the optimizations are supposed to work, so I > would > > > > > >>>>> appreciate > > > > > >>>>>>> any > > > > > >>>>>>>>>> help on this aspect. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This thread > > has > > > > > much > > > > > >>>>>>>>> discourse > > > > > >>>>>>>>>> between Jan and I between the current highwater mark > > > proposal > > > > > and > > > > > >> a > > > > > >>>>>>>>> groupBy > > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we need to > > > > strictly > > > > > >>>>> handle > > > > > >>>>>>>>> any > > > > > >>>>>>>>>> chance of out-of-order data and leave none of it up to > the > > > > > >>>>> consumer. > > > > > >>>>>>> Any > > > > > >>>>>>>>>> comments or suggestions here would also help. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> 3) Anything else that you see that would prevent this > from > > > > > moving > > > > > >>>>> to a > > > > > >>>>>>>>> vote? > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Thanks > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Adam > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare < > > > > > >>>>>>>>> adam.bellem...@gmail.com> > > > > > >>>>>>>>>> wrote: > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> Hi Jan > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and > > > > > >>>>> Stores.persistentWindowStore, > > > > > >>>>>>> you > > > > > >>>>>>>>>>> actually only need to specify the amount of segments > you > > > want > > > > > and > > > > > >>>>> how > > > > > >>>>>>>>> large > > > > > >>>>>>>>>>> they are. To the best of my understanding, what happens > > is > > > > that > > > > > >>>>> the > > > > > >>>>>>>>>>> segments are automatically rolled over as new data with > > new > > > > > >>>>>>> timestamps > > > > > >>>>>>>>> are > > > > > >>>>>>>>>>> created. We use this exact functionality in some of the > > > work > > > > > done > > > > > >>>>>>>>>>> internally at my company. For reference, this is the > > > hopping > > > > > >>>>> windowed > > > > > >>>>>>>>> store. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > > > > > https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21 > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> In the code that I have provided, there are going to be > > two > > > > 24h > > > > > >>>>>>>>> segments. > > > > > >>>>>>>>>>> When a record is put into the windowStore, it will be > > > > inserted > > > > > at > > > > > >>>>>>> time > > > > > >>>>>>>>> T in > > > > > >>>>>>>>>>> both segments. The two segments will always overlap by > > 12h. > > > > As > > > > > >>>>> time > > > > > >>>>>>>>> goes on > > > > > >>>>>>>>>>> and new records are added (say at time T+12h+), the > > oldest > > > > > >> segment > > > > > >>>>>>> will > > > > > >>>>>>>>> be > > > > > >>>>>>>>>>> automatically deleted and a new segment created. The > > > records > > > > > are > > > > > >>>>> by > > > > > >>>>>>>>> default > > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that it is > > the > > > > > record > > > > > >>>>>>> time, > > > > > >>>>>>>>> not > > > > > >>>>>>>>>>> the clock time, which is used. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> To the best of my understanding, the timestamps are > > > retained > > > > > when > > > > > >>>>>>>>>>> restoring from the changelog. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL > at a > > > > > >>>>>>> segment-level, > > > > > >>>>>>>>>>> instead of at an individual record level. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < > > > > > >>>>>>> jan.filip...@trivago.com> > > > > > >>>>>>>>>>> wrote: > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with > > > > > ClassCastException > > > > > >>>>> or > > > > > >>>>>>>>>>>> similar. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> You either would have to specify the window you > > fetch/put > > > or > > > > > >>>>> iterate > > > > > >>>>>>>>>>>> across all windows the key was found in right? > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> I just hope the window-store doesn't check stream-time > > > under > > > > > the > > > > > >>>>>>> hoods > > > > > >>>>>>>>>>>> that would be a questionable interface. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> If it does: did you see my comment on checking all the > > > > windows > > > > > >>>>>>> earlier? > > > > > >>>>>>>>>>>> that would be needed to actually give reasonable time > > > > > gurantees. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> Best > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote: > > > > > >>>>>>>>>>>>> Hi Jan > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Check for " highwaterMat " in the PR. I only changed > > the > > > > > state > > > > > >>>>>>> store, > > > > > >>>>>>>>>>>> not > > > > > >>>>>>>>>>>>> the ProcessorSupplier. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Thanks, > > > > > >>>>>>>>>>>>> Adam > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak < > > > > > >>>>>>>>> jan.filip...@trivago.com > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> @Guozhang > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed > something > > > that > > > > > >>>>> will be > > > > > >>>>>>>>>>>>>>> extremely > > > > > >>>>>>>>>>>>>>> useful for this KIP. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> @Jan > > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being said, I > will > > > not > > > > > be > > > > > >>>>>>> moving > > > > > >>>>>>>>>>>> ahead > > > > > >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy > > solution > > > > as > > > > > >> you > > > > > >>>>>>>>>>>> propose. > > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it > yourself > > > off > > > > > of > > > > > >>>>> my > > > > > >>>>>>>>>>>> current PR > > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I would > > be > > > > more > > > > > >>>>> than > > > > > >>>>>>>>>>>> happy to > > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it > stands > > > > right > > > > > >>>>> now, > > > > > >>>>>>> I do > > > > > >>>>>>>>>>>> not > > > > > >>>>>>>>>>>>>>> really have more time to invest into alternatives > > > without > > > > > >>>>> there > > > > > >>>>>>>>> being > > > > > >>>>>>>>>>>> a > > > > > >>>>>>>>>>>>>>> strong indication from the binding voters which > they > > > > would > > > > > >>>>>>> prefer. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally gave up > on > > > the > > > > > >>>>> streams > > > > > >>>>>>>>> DSL > > > > > >>>>>>>>>>>> for > > > > > >>>>>>>>>>>>>> some time already, otherwise I would have pulled > this > > > KIP > > > > > >>>>> through > > > > > >>>>>>>>>>>> already. > > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL based on > > PAPI. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the windowed > > > state > > > > > >>>>> store > > > > > >>>>>>> in > > > > > >>>>>>>>> the > > > > > >>>>>>>>>>>>>>> next > > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and then I > will > > > come > > > > > >> back > > > > > >>>>>>> for > > > > > >>>>>>>>>>>> final > > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of > the > > > > > binding > > > > > >>>>>>> voters > > > > > >>>>>>>>>>>> could > > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated > it > > > > > >>>>> according > > > > > >>>>>>> to > > > > > >>>>>>>>> the > > > > > >>>>>>>>>>>>>>> latest plan: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ > > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed > > store. > > > > > This > > > > > >>>>>>> could > > > > > >>>>>>>>> be > > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they > are > > > > > >>>>> completed. > > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Thanks, > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Adam > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier > already > > > > > updated > > > > > >>>>> in > > > > > >>>>>>> the > > > > > >>>>>>>>>>>> PR? > > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing > > > > something? > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang < > > > > > >>>>>>> wangg...@gmail.com> > > > > > >>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the > > > wrong > > > > > >> link, > > > > > >>>>>>> as it > > > > > >>>>>>>>>>>> is > > > > > >>>>>>>>>>>>>>>> for > > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of > > > > KIP-258 > > > > > >>>>> we do > > > > > >>>>>>>>>>>> want to > > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source > KTable" > > > such > > > > > >> that > > > > > >>>>>>>>>>>> instead of > > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the materialized > store, > > > > i.e. > > > > > >>>>>>> following > > > > > >>>>>>>>>>>>>>>> offset > > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are older > than > > > the > > > > > >>>>> current > > > > > >>>>>>>>>>>> key's > > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering. > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Guozhang > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang < > > > > > >>>>>>>>> wangg...@gmail.com> > > > > > >>>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Hello Adam, > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final > > step > > > > > (i.e. > > > > > >>>>> the > > > > > >>>>>>>>> high > > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced with > a > > > > window > > > > > >>>>>>> store), > > > > > >>>>>>>>> I > > > > > >>>>>>>>>>>>>>>>> think > > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually help: > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > >>>>>>>>>>>>>>>>> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a key-value > > > store > > > > > >>>>> (i.e. > > > > > >>>>>>> only > > > > > >>>>>>>>>>>> for > > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, > as > > > > > >>>>> described > > > > > >>>>>>> in > > > > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533 > , > > is > > > > > that > > > > > >>>>> we > > > > > >>>>>>> can > > > > > >>>>>>>>>>>> then > > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if its > > > > timestamp > > > > > is > > > > > >>>>>>>>> smaller > > > > > >>>>>>>>>>>> than > > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I > think > > it > > > > is > > > > > >>>>> very > > > > > >>>>>>>>>>>> similar to > > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark based > > > > filtering, > > > > > >>>>> while > > > > > >>>>>>>>> you > > > > > >>>>>>>>>>>> only > > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the > > joining > > > > > >> records > > > > > >>>>>>> are > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> correctly > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the final > > > stage. > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and > hence > > > > > >>>>>>> non-windowed > > > > > >>>>>>>>>>>> KTables > > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not really > > have > > > a > > > > > good > > > > > >>>>>>>>> support > > > > > >>>>>>>>>>>> for > > > > > >>>>>>>>>>>>>>>>> their joins anyways ( > > > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107) > > > > > >>>>>>>>>>>>>>>>> I > > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed > > KTable-KTable > > > > > >>>>> non-key > > > > > >>>>>>>>> joins > > > > > >>>>>>>>>>>> for > > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help. > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> Guozhang > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak < > > > > > >>>>>>>>>>>> jan.filip...@trivago.com > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote: > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation would > grow > > > > > >> endlessly > > > > > >>>>>>> based > > > > > >>>>>>>>>>>> on > > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair of > > > (<this > > > > > >>>>> table > > > > > >>>>>>>>>>>> primary > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> key>, > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is used > > to > > > > > >>>>>>> differentiate > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> between > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal > > > would > > > > > be > > > > > >>>>> to > > > > > >>>>>>>>>>>> replace > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> it > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. This > > would > > > > > allow > > > > > >>>>> the > > > > > >>>>>>>>> same > > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This > > > > should > > > > > >>>>> allow > > > > > >>>>>>> for > > > > > >>>>>>>>>>>> all > > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and > should > > be > > > > > >>>>>>> customizable > > > > > >>>>>>>>>>>> by > > > > > >>>>>>>>>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps > > just > > > > 10 > > > > > >>>>>>> minutes > > > > > >>>>>>>>> of > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> window, > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...). > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the > > > trick > > > > > >> here. > > > > > >>>>>>> Even > > > > > >>>>>>>>>>>> if I > > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic > > repartitioning > > > > > >>>>> optional > > > > > >>>>>>>>>>>> since I > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> would > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a > > > little > > > > > bit > > > > > >>>>>>>>>>>> sceptical > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> about > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially one > > could > > > > run > > > > > >>>>> into > > > > > >>>>>>>>>>>> problems > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> when > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window border. I > > will > > > > > check > > > > > >>>>> you > > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its problematic, we > > > could > > > > > >>>>> still > > > > > >>>>>>>>> check > > > > > >>>>>>>>>>>>>>>>>> _all_ > > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad performance > > impact I > > > > > >> guess. > > > > > >>>>>>> Will > > > > > >>>>>>>>>>>> let > > > > > >>>>>>>>>>>>>>>>>> you > > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be correct as > > is. I > > > > > >>>>> wouldn't > > > > > >>>>>>> not > > > > > >>>>>>>>>>>> like > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> to > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => > > timestamp(A) < > > > > > >>>>>>>>> timestamp(B). > > > > > >>>>>>>>>>>> I > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> think > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> we can't expect that. > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> @Jan > > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now - > thanks > > > for > > > > > the > > > > > >>>>>>>>>>>> diagram, it > > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do not > > have > > > > the > > > > > >>>>>>> original > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> primary > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was > > available > > > > > then > > > > > >>>>> you > > > > > >>>>>>>>>>>> would be > > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the Map. > That > > > > being > > > > > >>>>> said, > > > > > >>>>>>> I > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> encourage > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for > > clarity > > > > for > > > > > >>>>>>> everyone > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> else. > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard > > > work. > > > > > But > > > > > >>>>> I > > > > > >>>>>>>>>>>> understand > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the > > original > > > > > >> primary > > > > > >>>>>>> key, > > > > > >>>>>>>>> We > > > > > >>>>>>>>>>>>>>>>>> have > > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI > and > > > > > >> basically > > > > > >>>>>>> not > > > > > >>>>>>>>>>>> using > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> any > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed > that > > in > > > > > >>>>> original > > > > > >>>>>>> DSL > > > > > >>>>>>>>>>>> its > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> not > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess up on > > my > > > > end. > > > > > >>>>> Will > > > > > >>>>>>>>>>>> finish > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this week. > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the Map > > stay > > > > > >> inside > > > > > >>>>>>> the > > > > > >>>>>>>>>>>> State > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes > have > > > > > >>>>> propagated? > > > > > >>>>>>>>> Isn't > > > > > >>>>>>>>>>>>>>>>>>> this > > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark state > > > store? > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor > is > > > > gonna > > > > > >>>>>>> return > > > > > >>>>>>>>>>>> `null` > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> and > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But there > is > > > > going > > > > > to > > > > > >>>>> be > > > > > >>>>>>> a > > > > > >>>>>>>>>>>> store > > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this > store > > > > > directly > > > > > >>>>> for > > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a > > > regular > > > > > >>>>> store, > > > > > >>>>>>>>>>>> satisfying > > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / join. > > The > > > > > >>>>> Windowed > > > > > >>>>>>>>>>>> store is > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> not > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next statefull > > > operation > > > > > we > > > > > >>>>>>> would > > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we have > the > > > > > window > > > > > >>>>>>> store > > > > > >>>>>>>>>>>> also > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> have > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> the values then. > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom > group > > > by > > > > > >>>>> before > > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary key i > think > > > it > > > > > >> would > > > > > >>>>>>> help > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> users > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given the > > > original > > > > > >>>>> primary > > > > > >>>>>>>>> key > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> issue I > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid foundation > > to > > > > > build > > > > > >>>>> on. > > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user. > very > > > > > >>>>>>> unfortunate. I > > > > > >>>>>>>>>>>> could > > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I do not > > > think > > > > > its > > > > > >>>>> a > > > > > >>>>>>> good > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> decision. > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> Thanks > > > > > >>>>>>>>>>>>>>>>>>> Adam > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta > > Dumbre < > > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto: > > > > > >>>>>>> dumbreprajakta...@gmail.com > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> please remove me from this group > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:29 PM Jan > > > > Filipiak > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: > > > > > >>>>>>>>> jan.filip...@trivago.com > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > Hi Adam, > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> > give me some time, will make such a > > > > chart. > > > > > >> last > > > > > >>>>>>> time i > > > > > >>>>>>>>>>>> didn't > > > > > >>>>>>>>>>>>>>>>>>> get along > > > > > >>>>>>>>>>>>>>>>>>> > well with giphy and ruined all your > > > > charts. > > > > > >>>>>>>>>>>>>>>>>>> > Hopefully i can get it done today > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> > On 08.09.2018 16:00, Adam Bellemare > > > > wrote: > > > > > >>>>>>>>>>>>>>>>>>> > > Hi Jan > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > I have included a diagram of > what I > > > > > >> attempted > > > > > >>>>> on > > > > > >>>>>>> the > > > > > >>>>>>>>>>>> KIP. > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining > > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate > > > > > >>>>>>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin > > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > I attempted this back at the > start > > of > > > > my > > > > > own > > > > > >>>>>>>>>>>> implementation > > > > > >>>>>>>>>>>>>>>>>>> of > > > > > >>>>>>>>>>>>>>>>>>> this > > > > > >>>>>>>>>>>>>>>>>>> > > solution, and since I could not > get > > > it > > > > to > > > > > >>>>> work I > > > > > >>>>>>> have > > > > > >>>>>>>>>>>> since > > > > > >>>>>>>>>>>>>>>>>>> discarded the > > > > > >>>>>>>>>>>>>>>>>>> > > code. At this point in time, if > you > > > > wish > > > > > to > > > > > >>>>>>> continue > > > > > >>>>>>>>>>>> pursuing > > > > > >>>>>>>>>>>>>>>>>>> for your > > > > > >>>>>>>>>>>>>>>>>>> > > groupBy solution, I ask that you > > > please > > > > > >>>>> create a > > > > > >>>>>>>>>>>> diagram on > > > > > >>>>>>>>>>>>>>>>>>> the KIP > > > > > >>>>>>>>>>>>>>>>>>> > > carefully explaining your > solution. > > > > > Please > > > > > >>>>> feel > > > > > >>>>>>> free > > > > > >>>>>>>>> to > > > > > >>>>>>>>>>>> use > > > > > >>>>>>>>>>>>>>>>>>> the image I > > > > > >>>>>>>>>>>>>>>>>>> > > just posted as a starting point. > I > > am > > > > > having > > > > > >>>>>>> trouble > > > > > >>>>>>>>>>>>>>>>>>> understanding your > > > > > >>>>>>>>>>>>>>>>>>> > > explanations but I think that a > > > > carefully > > > > > >>>>>>> constructed > > > > > >>>>>>>>>>>> diagram > > > > > >>>>>>>>>>>>>>>>>>> will clear > > > > > >>>>>>>>>>>>>>>>>>> > up > > > > > >>>>>>>>>>>>>>>>>>> > > any misunderstandings. > Alternately, > > > > > please > > > > > >>>>> post a > > > > > >>>>>>>>>>>>>>>>>>> comprehensive PR with > > > > > >>>>>>>>>>>>>>>>>>> > > your solution. I can only guess > at > > > what > > > > > you > > > > > >>>>>>> mean, and > > > > > >>>>>>>>>>>> since I > > > > > >>>>>>>>>>>>>>>>>>> value my > > > > > >>>>>>>>>>>>>>>>>>> > own > > > > > >>>>>>>>>>>>>>>>>>> > > time as much as you value yours, > I > > > > > believe > > > > > >> it > > > > > >>>>> is > > > > > >>>>>>> your > > > > > >>>>>>>>>>>>>>>>>>> responsibility to > > > > > >>>>>>>>>>>>>>>>>>> > > provide an implementation instead > > of > > > me > > > > > >>>>> trying to > > > > > >>>>>>>>> guess. > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > Adam > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > > On Sat, Sep 8, 2018 at 8:00 AM, > Jan > > > > > Filipiak > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: > > > > > >>>>>>>>> jan.filip...@trivago.com > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > wrote: > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>>>>> > >> Hi James, > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> nice to see you beeing > interested. > > > > kafka > > > > > >>>>>>> streams at > > > > > >>>>>>>>>>>> this > > > > > >>>>>>>>>>>>>>>>>>> point supports > > > > > >>>>>>>>>>>>>>>>>>> > >> all sorts of joins as long as > both > > > > > streams > > > > > >>>>> have > > > > > >>>>>>> the > > > > > >>>>>>>>>>>> same > > > > > >>>>>>>>>>>>>>>>>>> key. > > > > > >>>>>>>>>>>>>>>>>>> > >> Adam is currently implementing a > > > join > > > > > >> where a > > > > > >>>>>>> KTable > > > > > >>>>>>>>>>>> and a > > > > > >>>>>>>>>>>>>>>>>>> KTable can > > > > > >>>>>>>>>>>>>>>>>>> > have > > > > > >>>>>>>>>>>>>>>>>>> > >> a one to many relation ship > (1:n). > > > We > > > > > >> exploit > > > > > >>>>>>> that > > > > > >>>>>>>>>>>> rocksdb > > > > > >>>>>>>>>>>>>>>>>>> is > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> a > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > >> datastore that keeps data sorted > (At > > > > least > > > > > >>>>>>> exposes an > > > > > >>>>>>>>>>>> API to > > > > > >>>>>>>>>>>>>>>>>>> access the > > > > > >>>>>>>>>>>>>>>>>>> > >> stored data in a sorted > fashion). > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> I think the technical caveats > are > > > well > > > > > >>>>>>> understood > > > > > >>>>>>>>> now > > > > > >>>>>>>>>>>> and we > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> are > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > basically > > > > > >>>>>>>>>>>>>>>>>>> > >> down to philosophy and API > Design > > ( > > > > when > > > > > >> Adam > > > > > >>>>>>> sees > > > > > >>>>>>>>> my > > > > > >>>>>>>>>>>> newest > > > > > >>>>>>>>>>>>>>>>>>> message). > > > > > >>>>>>>>>>>>>>>>>>> > >> I have a lengthy track record of > > > > loosing > > > > > >>>>> those > > > > > >>>>>>> kinda > > > > > >>>>>>>>>>>>>>>>>>> arguments within > > > > > >>>>>>>>>>>>>>>>>>> > the > > > > > >>>>>>>>>>>>>>>>>>> > >> streams community and I have no > > clue > > > > > why. > > > > > >> So > > > > > >>>>> I > > > > > >>>>>>>>>>>> literally > > > > > >>>>>>>>>>>>>>>>>>> can't wait for > > > > > >>>>>>>>>>>>>>>>>>> > you > > > > > >>>>>>>>>>>>>>>>>>> > >> to churn through this thread and > > > give > > > > > you > > > > > >>>>>>> opinion on > > > > > >>>>>>>>>>>> how we > > > > > >>>>>>>>>>>>>>>>>>> should > > > > > >>>>>>>>>>>>>>>>>>> > design > > > > > >>>>>>>>>>>>>>>>>>> > >> the return type of the > > oneToManyJoin > > > > and > > > > > >> how > > > > > >>>>>>> many > > > > > >>>>>>>>>>>> power we > > > > > >>>>>>>>>>>>>>>>>>> want to give > > > > > >>>>>>>>>>>>>>>>>>> > to > > > > > >>>>>>>>>>>>>>>>>>> > >> the user vs "simplicity" (where > > > > > simplicity > > > > > >>>>> isn't > > > > > >>>>>>>>>>>> really that > > > > > >>>>>>>>>>>>>>>>>>> as users > > > > > >>>>>>>>>>>>>>>>>>> > still > > > > > >>>>>>>>>>>>>>>>>>> > >> need to understand it I argue) > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> waiting for you to join in on > the > > > > > >> discussion > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Best Jan > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> On 07.09.2018 15:49, James Kwan > > > wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> I am new to this group and I > > found > > > > this > > > > > >>>>> subject > > > > > >>>>>>>>>>>>>>>>>>> interesting. Sounds > > > > > >>>>>>>>>>>>>>>>>>> > like > > > > > >>>>>>>>>>>>>>>>>>> > >>> you guys want to implement a > join > > > > > table of > > > > > >>>>> two > > > > > >>>>>>>>>>>> streams? Is > > > > > >>>>>>>>>>>>>>>>>>> there > > > > > >>>>>>>>>>>>>>>>>>> > somewhere > > > > > >>>>>>>>>>>>>>>>>>> > >>> I can see the original > > requirement > > > or > > > > > >>>>> proposal? > > > > > >>>>>>>>>>>>>>>>>>> > >>> > > > > > >>>>>>>>>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:13 AM, Jan > > > > > Filipiak > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: > > > > > >>>>>>>>> jan.filip...@trivago.com > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> On 05.09.2018 22:17, Adam > > > Bellemare > > > > > >> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> I'm currently testing using a > > > > > Windowed > > > > > >>>>> Store > > > > > >>>>>>> to > > > > > >>>>>>>>>>>> store the > > > > > >>>>>>>>>>>>>>>>>>> highwater > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> mark. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> By all indications this > should > > > work > > > > > >> fine, > > > > > >>>>>>> with > > > > > >>>>>>>>> the > > > > > >>>>>>>>>>>> caveat > > > > > >>>>>>>>>>>>>>>>>>> being that > > > > > >>>>>>>>>>>>>>>>>>> > it > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> can > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> only resolve out-of-order > > arrival > > > > > for up > > > > > >>>>> to > > > > > >>>>>>> the > > > > > >>>>>>>>>>>> size of > > > > > >>>>>>>>>>>>>>>>>>> the window > > > > > >>>>>>>>>>>>>>>>>>> > (ie: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> 24h, 72h, etc). This would > > remove > > > > the > > > > > >>>>>>> possibility > > > > > >>>>>>>>>>>> of it > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> being > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > unbounded > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> in > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> size. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> With regards to Jan's > > > suggestion, I > > > > > >>>>> believe > > > > > >>>>>>> this > > > > > >>>>>>>>> is > > > > > >>>>>>>>>>>> where > > > > > >>>>>>>>>>>>>>>>>>> we will > > > > > >>>>>>>>>>>>>>>>>>> > have > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> to > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> remain in disagreement. > While I > > > do > > > > > not > > > > > >>>>>>> disagree > > > > > >>>>>>>>>>>> with your > > > > > >>>>>>>>>>>>>>>>>>> statement > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> about > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> there likely to be additional > > > joins > > > > > done > > > > > >>>>> in a > > > > > >>>>>>>>>>>> real-world > > > > > >>>>>>>>>>>>>>>>>>> workflow, I > > > > > >>>>>>>>>>>>>>>>>>> > do > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> not > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> see how you can conclusively > > deal > > > > > with > > > > > >>>>>>>>> out-of-order > > > > > >>>>>>>>>>>>>>>>>>> arrival > > > > > >>>>>>>>>>>>>>>>>>> of > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> foreign-key > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> changes and subsequent > joins. I > > > > have > > > > > >>>>>>> attempted > > > > > >>>>>>>>> what > > > > > >>>>>>>>>>>> I > > > > > >>>>>>>>>>>>>>>>>>> think you have > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> proposed (without a > high-water, > > > > using > > > > > >>>>>>> groupBy and > > > > > >>>>>>>>>>>> reduce) > > > > > >>>>>>>>>>>>>>>>>>> and found > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> that if > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> the foreign key changes too > > > > quickly, > > > > > or > > > > > >>>>> the > > > > > >>>>>>> load > > > > > >>>>>>>>> on > > > > > >>>>>>>>>>>> a > > > > > >>>>>>>>>>>>>>>>>>> stream thread > > > > > >>>>>>>>>>>>>>>>>>> > is > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> too > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> high, the joined messages > will > > > > arrive > > > > > >>>>>>>>> out-of-order > > > > > >>>>>>>>>>>> and be > > > > > >>>>>>>>>>>>>>>>>>> incorrectly > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> propagated, such that an > > > > intermediate > > > > > >>>>> event > > > > > >>>>>>> is > > > > > >>>>>>>>>>>>>>>>>>> represented > > > > > >>>>>>>>>>>>>>>>>>> as the > > > > > >>>>>>>>>>>>>>>>>>> > final > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> event. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Can you shed some light on > your > > > > > groupBy > > > > > >>>>>>>>>>>> implementation. > > > > > >>>>>>>>>>>>>>>>>>> There must be > > > > > >>>>>>>>>>>>>>>>>>> > >>>> some sort of flaw in it. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> I have a suspicion where it > is, > > I > > > > > would > > > > > >>>>> just > > > > > >>>>>>> like > > > > > >>>>>>>>> to > > > > > >>>>>>>>>>>>>>>>>>> confirm. The idea > > > > > >>>>>>>>>>>>>>>>>>> > >>>> is bullet proof and it must be > > > > > >>>>>>>>>>>>>>>>>>> > >>>> an implementation mess up. I > > would > > > > > like > > > > > >> to > > > > > >>>>>>> clarify > > > > > >>>>>>>>>>>> before > > > > > >>>>>>>>>>>>>>>>>>> we draw a > > > > > >>>>>>>>>>>>>>>>>>> > >>>> conclusion. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Repartitioning the > scattered > > > > events > > > > > >>>>> back to > > > > > >>>>>>>>> their > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> original > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > >>>>> partitions is the only way I > know > > > how > > > > > to > > > > > >>>>>>>>> conclusively > > > > > >>>>>>>>>>>> deal > > > > > >>>>>>>>>>>>>>>>>>> with > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> out-of-order events in a > given > > > time > > > > > >> frame, > > > > > >>>>>>> and to > > > > > >>>>>>>>>>>> ensure > > > > > >>>>>>>>>>>>>>>>>>> that the > > > > > >>>>>>>>>>>>>>>>>>> > data > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> is > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> eventually consistent with > the > > > > input > > > > > >>>>> events. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> If you have some code to > share > > > that > > > > > >>>>>>> illustrates > > > > > >>>>>>>>> your > > > > > >>>>>>>>>>>>>>>>>>> approach, I > > > > > >>>>>>>>>>>>>>>>>>> > would > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> be > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> very grateful as it would > > remove > > > > any > > > > > >>>>>>>>>>>> misunderstandings > > > > > >>>>>>>>>>>>>>>>>>> that I may > > > > > >>>>>>>>>>>>>>>>>>> > have. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> ah okay you were looking for > my > > > > code. > > > > > I > > > > > >>>>> don't > > > > > >>>>>>> have > > > > > >>>>>>>>>>>>>>>>>>> something easily > > > > > >>>>>>>>>>>>>>>>>>> > >>>> readable here as its bloated > > with > > > > > >>>>> OO-patterns. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> its anyhow trivial: > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> @Override > > > > > >>>>>>>>>>>>>>>>>>> > >>>> public T apply(K aggKey, > V > > > > > value, T > > > > > >>>>>>>>> aggregate) > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Map<U, V> > > > > currentStateAsMap = > > > > > >>>>>>>>>>>> asMap(aggregate); > > > > > >>>>>>>>>>>>>>>>>>> << > > > > > >>>>>>>>>>>>>>>>>>> imaginary > > > > > >>>>>>>>>>>>>>>>>>> > >>>> U toModifyKey = > > > > > >>>>> mapper.apply(value); > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << this is the > > place > > > > > where > > > > > >>>>> people > > > > > >>>>>>>>>>>> actually > > > > > >>>>>>>>>>>>>>>>>>> gonna have > > > > > >>>>>>>>>>>>>>>>>>> > issues > > > > > >>>>>>>>>>>>>>>>>>> > >>>> and why you probably couldn't > do > > > it. > > > > > we > > > > > >>>>> would > > > > > >>>>>>> need > > > > > >>>>>>>>>>>> to find > > > > > >>>>>>>>>>>>>>>>>>> a solution > > > > > >>>>>>>>>>>>>>>>>>> > here. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> I didn't realize that yet. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << we propagate > the > > > > > field in > > > > > >>>>> the > > > > > >>>>>>>>>>>> joiner, so > > > > > >>>>>>>>>>>>>>>>>>> that we can > > > > > >>>>>>>>>>>>>>>>>>> > pick > > > > > >>>>>>>>>>>>>>>>>>> > >>>> it up in an aggregate. > Probably > > > you > > > > > have > > > > > >>>>> not > > > > > >>>>>>>>> thought > > > > > >>>>>>>>>>>> of > > > > > >>>>>>>>>>>>>>>>>>> this in your > > > > > >>>>>>>>>>>>>>>>>>> > >>>> approach right? > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << I am very open > > to > > > > > find a > > > > > >>>>>>> generic > > > > > >>>>>>>>>>>> solution > > > > > >>>>>>>>>>>>>>>>>>> here. In my > > > > > >>>>>>>>>>>>>>>>>>> > >>>> honest opinion this is broken > in > > > > > >>>>>>>>> KTableImpl.GroupBy > > > > > >>>>>>>>>>>> that > > > > > >>>>>>>>>>>>>>>>>>> it > > > > > >>>>>>>>>>>>>>>>>>> looses > > > > > >>>>>>>>>>>>>>>>>>> > the keys > > > > > >>>>>>>>>>>>>>>>>>> > >>>> and only maintains the > aggregate > > > > key. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << I abstracted > it > > > away > > > > > back > > > > > >>>>>>> then way > > > > > >>>>>>>>>>>> before > > > > > >>>>>>>>>>>>>>>>>>> i > > > > > >>>>>>>>>>>>>>>>>>> was > > > > > >>>>>>>>>>>>>>>>>>> > thinking > > > > > >>>>>>>>>>>>>>>>>>> > >>>> of oneToMany join. That is > why I > > > > > didn't > > > > > >>>>>>> realize > > > > > >>>>>>>>> its > > > > > >>>>>>>>>>>>>>>>>>> significance here. > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << Opinions? > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> for (V m : current) > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > currentStateAsMap.put(mapper.apply(m), > > > > > >> m); > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } > > > > > >>>>>>>>>>>>>>>>>>> > >>>> if (isAdder) > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > currentStateAsMap.put(toModifyKey, > > > > > >> value); > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } > > > > > >>>>>>>>>>>>>>>>>>> > >>>> else > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > currentStateAsMap.remove(toModifyKey); > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > if(currentStateAsMap.isEmpty()){ > > > > > >>>>>>>>>>>>>>>>>>> > >>>> return null; > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } > > > > > >>>>>>>>>>>>>>>>>>> > >>>> retrun > > > > > >>>>>>> asAggregateType(currentStateAsMap) > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Thanks, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> Adam > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 at 3:35 > PM, > > > Jan > > > > > >>>>> Filipiak > > > > > >>>>>>> < > > > > > >>>>>>>>>>>>>>>>>>> > jan.filip...@trivago.com <mailto: > > > > > >>>>>>>>> jan.filip...@trivago.com > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> Thanks Adam for bringing > > Matthias > > > > to > > > > > >>>>> speed! > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> about the differences. I > think > > > > > >> re-keying > > > > > >>>>>>> back > > > > > >>>>>>>>>>>> should be > > > > > >>>>>>>>>>>>>>>>>>> optional at > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> best. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I would say we return a > > > > > KScatteredTable > > > > > >>>>> with > > > > > >>>>>>>>>>>> reshuffle() > > > > > >>>>>>>>>>>>>>>>>>> returning > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> KTable<originalKey,Joined> > to > > > make > > > > > the > > > > > >>>>>>> backwards > > > > > >>>>>>>>>>>>>>>>>>> repartitioning > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> optional. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I am also in a big favour of > > > doing > > > > > the > > > > > >>>>> out > > > > > >>>>>>> of > > > > > >>>>>>>>> order > > > > > >>>>>>>>>>>>>>>>>>> processing using > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> group > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> by instead high water mark > > > > tracking. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> Just because unbounded > growth > > is > > > > > just > > > > > >>>>> scary > > > > > >>>>>>> + It > > > > > >>>>>>>>>>>> saves > > > > > >>>>>>>>>>>>>>>>>>> us > > > > > >>>>>>>>>>>>>>>>>>> the header > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> stuff. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I think the abstraction of > > > always > > > > > >>>>>>> repartitioning > > > > > >>>>>>>>>>>> back is > > > > > >>>>>>>>>>>>>>>>>>> just not so > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> strong. Like the work has > been > > > > done > > > > > >>>>> before > > > > > >>>>>>> we > > > > > >>>>>>>>>>>> partition > > > > > >>>>>>>>>>>>>>>>>>> back and > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> grouping > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> by something else afterwards > > is > > > > > really > > > > > >>>>>>> common. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> On 05.09.2018 13:49, Adam > > > > Bellemare > > > > > >>>>> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> Hi Matthias > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Thank you for your > feedback, > > I > > > do > > > > > >>>>>>> appreciate > > > > > >>>>>>>>> it! > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> While name spacing would be > > > > > possible, > > > > > >> it > > > > > >>>>>>> would > > > > > >>>>>>>>>>>> require > > > > > >>>>>>>>>>>>>>>>>>> to > > > > > >>>>>>>>>>>>>>>>>>> > deserialize > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what implies > a > > > > > runtime > > > > > >>>>>>> overhead. > > > > > >>>>>>>>> I > > > > > >>>>>>>>>>>> would > > > > > >>>>>>>>>>>>>>>>>>> suggest to > > > > > >>>>>>>>>>>>>>>>>>> > no > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to avoid > > the > > > > > >>>>> overhead. > > > > > >>>>>>> If > > > > > >>>>>>>>> this > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> becomes a > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > problem in > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can still > add > > > > name > > > > > >>>>> spacing > > > > > >>>>>>>>> later > > > > > >>>>>>>>>>>> on. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Agreed. I will go with > > using a > > > > > >> reserved > > > > > >>>>>>> string > > > > > >>>>>>>>>>>> and > > > > > >>>>>>>>>>>>>>>>>>> document it. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> My main concern about the > > > design > > > > it > > > > > >> the > > > > > >>>>>>> type of > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> result KTable: > > > > > >>>>>>>>>>>>>>>>>>> > If > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> I > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> understood the proposal > > > > correctly, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> In your example, you have > > > table1 > > > > > and > > > > > >>>>> table2 > > > > > >>>>>>>>>>>> swapped. > > > > > >>>>>>>>>>>>>>>>>>> Here is how it > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> works > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> currently: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 1) table1 has the records > > that > > > > > contain > > > > > >>>>> the > > > > > >>>>>>>>>>>> foreign key > > > > > >>>>>>>>>>>>>>>>>>> within their > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> value. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 input stream: > > > > > <a,(fk=A,bar=1)>, > > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> <c,(fk=B,bar=3)> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table2 input stream: <A,X>, > > > <B,Y> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 2) A Value mapper is > required > > > to > > > > > >> extract > > > > > >>>>>>> the > > > > > >>>>>>>>>>>> foreign > > > > > >>>>>>>>>>>>>>>>>>> key. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 foreign key mapper: > ( > > > > value > > > > > => > > > > > >>>>>>> value.fk > > > > > >>>>>>>>>>>>>>>>>>> <http://value.fk> ) > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> The mapper is applied to > each > > > > > element > > > > > >> in > > > > > >>>>>>>>> table1, > > > > > >>>>>>>>>>>> and a > > > > > >>>>>>>>>>>>>>>>>>> new combined > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> key is > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> made: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 mapped: <A-a, > > > > (fk=A,bar=1)>, > > > > > >>>>> <A-b, > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>, > > > > > >>>>>>>>>>>>>>>>>>> <B-c, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> (fk=B,bar=3)> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 3) The rekeyed events are > > > > > >> copartitioned > > > > > >>>>>>> with > > > > > >>>>>>>>>>>> table2: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> a) Stream Thread with > > Partition > > > > 0: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <A-a, > > > > > >>>>> (fk=A,bar=1)>, > > > > > >>>>>>> <A-b, > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <A,X> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> b) Stream Thread with > > Partition > > > > 1: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <B-c, > > > > > >> (fk=B,bar=3)> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <B,Y> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 4) From here, they can be > > > joined > > > > > >>>>> together > > > > > >>>>>>>>> locally > > > > > >>>>>>>>>>>> by > > > > > >>>>>>>>>>>>>>>>>>> applying the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> joiner > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> function. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> At this point, Jan's design > > and > > > > my > > > > > >>>>> design > > > > > >>>>>>>>>>>> deviate. My > > > > > >>>>>>>>>>>>>>>>>>> design goes > > > > > >>>>>>>>>>>>>>>>>>> > on > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> to > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> repartition the data > > post-join > > > > and > > > > > >>>>> resolve > > > > > >>>>>>>>>>>> out-of-order > > > > > >>>>>>>>>>>>>>>>>>> arrival of > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> records, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> finally returning the data > > > keyed > > > > > just > > > > > >>>>> the > > > > > >>>>>>>>>>>> original key. > > > > > >>>>>>>>>>>>>>>>>>> I do not > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> expose > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> CombinedKey or any of the > > > > internals > > > > > >>>>>>> outside of > > > > > >>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> joinOnForeignKey > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> function. This does make > for > > > > larger > > > > > >>>>>>> footprint, > > > > > >>>>>>>>>>>> but it > > > > > >>>>>>>>>>>>>>>>>>> removes all > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> agency > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> for resolving out-of-order > > > > arrivals > > > > > >> and > > > > > >>>>>>>>> handling > > > > > >>>>>>>>>>>>>>>>>>> CombinedKeys from > > > > > >>>>>>>>>>>>>>>>>>> > the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> user. I believe that this > > makes > > > > the > > > > > >>>>>>> function > > > > > >>>>>>>>> much > > > > > >>>>>>>>>>>>>>>>>>> easier > > > > > >>>>>>>>>>>>>>>>>>> to use. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Let me know if this helps > > > resolve > > > > > your > > > > > >>>>>>>>> questions, > > > > > >>>>>>>>>>>> and > > > > > >>>>>>>>>>>>>>>>>>> please feel > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> free to > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> add anything else on your > > mind. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Thanks again, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Adam > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 > > PM, > > > > > >>>>> Matthias J. > > > > > >>>>>>>>> Sax < > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> matth...@confluent.io > > <mailto: > > > > > >>>>>>>>>>>> matth...@confluent.io>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Hi, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> I am just catching up on > > this > > > > > >> thread. I > > > > > >>>>>>> did > > > > > >>>>>>>>> not > > > > > >>>>>>>>>>>> read > > > > > >>>>>>>>>>>>>>>>>>> everything so > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> far, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> but want to share couple > of > > > > > initial > > > > > >>>>>>> thoughts: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Headers: I think there is > a > > > > > >> fundamental > > > > > >>>>>>>>>>>> difference > > > > > >>>>>>>>>>>>>>>>>>> between header > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> usage > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> in this KIP and KP-258. > For > > > 258, > > > > > we > > > > > >> add > > > > > >>>>>>>>> headers > > > > > >>>>>>>>>>>> to > > > > > >>>>>>>>>>>>>>>>>>> changelog topic > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> that > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> are owned by Kafka Streams > > and > > > > > nobody > > > > > >>>>>>> else is > > > > > >>>>>>>>>>>> supposed > > > > > >>>>>>>>>>>>>>>>>>> to write > > > > > >>>>>>>>>>>>>>>>>>> > into > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> them. In fact, no user > > header > > > > are > > > > > >>>>> written > > > > > >>>>>>> into > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> changelog topic > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> and > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> thus, there are not > > conflicts. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Nevertheless, I don't see > a > > > big > > > > > issue > > > > > >>>>> with > > > > > >>>>>>>>> using > > > > > >>>>>>>>>>>>>>>>>>> headers within > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Streams. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> As long as we document it, > > we > > > > can > > > > > >> have > > > > > >>>>>>> some > > > > > >>>>>>>>>>>> "reserved" > > > > > >>>>>>>>>>>>>>>>>>> header keys > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> and > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> users are not allowed to > use > > > > when > > > > > >>>>>>> processing > > > > > >>>>>>>>>>>> data with > > > > > >>>>>>>>>>>>>>>>>>> Kafka > > > > > >>>>>>>>>>>>>>>>>>> > Streams. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this should be ok. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> I think there is a safe > way > > to > > > > > avoid > > > > > >>>>>>>>> conflicts, > > > > > >>>>>>>>>>>> since > > > > > >>>>>>>>>>>>>>>>>>> these > > > > > >>>>>>>>>>>>>>>>>>> > headers > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> are > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> only needed in internal > > > topics > > > > (I > > > > > >>>>> think): > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> For internal and > changelog > > > > > topics, > > > > > >> we > > > > > >>>>> can > > > > > >>>>>>>>>>>> namespace > > > > > >>>>>>>>>>>>>>>>>>> all headers: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * user-defined headers > are > > > > > >> namespaced > > > > > >>>>> as > > > > > >>>>>>>>>>>> "external." > > > > > >>>>>>>>>>>>>>>>>>> + > > > > > >>>>>>>>>>>>>>>>>>> headerKey > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * internal headers are > > > > > namespaced as > > > > > >>>>>>>>>>>> "internal." + > > > > > >>>>>>>>>>>>>>>>>>> headerKey > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> While name spacing would > be > > > > > >> possible, > > > > > >>>>> it > > > > > >>>>>>>>> would > > > > > >>>>>>>>>>>>>>>>>>> require > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> to > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > >>>>>>>> deserialize > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what implies > a > > > > > runtime > > > > > >>>>>>> overhead. > > > > > >>>>>>>>> I > > > > > >>>>>>>>>>>> would > > > > > >>>>>>>>>>>>>>>>>>> suggest to > > > > > >>>>>>>>>>>>>>>>>>> > no > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to avoid > > the > > > > > >>>>> overhead. > > > > > >>>>>>> If > > > > > >>>>>>>>> this > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> becomes a > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > problem in > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can still > add > > > > name > > > > > >>>>> spacing > > > > > >>>>>>>>> later > > > > > >>>>>>>>>>>> on. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> My main concern about the > > > design > > > > > it > > > > > >> the > > > > > >>>>>>> type > > > > > >>>>>>>>> of > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> result KTable: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> If I > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> understood the proposal > > > > correctly, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V1> table1 = ... > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K2,V2> table2 = ... > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V3> joinedTable > = > > > > > >>>>>>>>>>>> table1.join(table2,...); > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> implies that the > > `joinedTable` > > > > has > > > > > >> the > > > > > >>>>>>> same > > > > > >>>>>>>>> key > > > > > >>>>>>>>>>>> as the > > > > > >>>>>>>>>>>>>>>>>>> left input > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this does not work > > > because > > > > > if > > > > > >>>>> table2 > > > > > >>>>>>>>>>>> contains > > > > > >>>>>>>>>>>>>>>>>>> multiple rows > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> that > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> join with a record in > table1 > > > > > (what is > > > > > >>>>> the > > > > > >>>>>>> main > > > > > >>>>>>>>>>>> purpose > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> of > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> a > > > > > >>>>>>>>>>>>>>>>>>> > foreign > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> key > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> join), the result table > > would > > > > only > > > > > >>>>>>> contain a > > > > > >>>>>>>>>>>> single > > > > > >>>>>>>>>>>>>>>>>>> join result, > > > > > >>>>>>>>>>>>>>>>>>> > but > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> not > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> multiple. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Example: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table1 input stream: <A,X> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table2 input stream: > > > <a,(A,1)>, > > > > > >>>>> <b,(A,2)> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> We use table2 value a > > foreign > > > > key > > > > > to > > > > > >>>>>>> table1 > > > > > >>>>>>>>> key > > > > > >>>>>>>>>>>> (ie, > > > > > >>>>>>>>>>>>>>>>>>> "A" joins). > > > > > >>>>>>>>>>>>>>>>>>> > If > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> result key is the same key > > as > > > > key > > > > > of > > > > > >>>>>>> table1, > > > > > >>>>>>>>> this > > > > > >>>>>>>>>>>>>>>>>>> implies that the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> result can either be <A, > > > > > join(X,1)> > > > > > >> or > > > > > >>>>> <A, > > > > > >>>>>>>>>>>> join(X,2)> > > > > > >>>>>>>>>>>>>>>>>>> but not > > > > > >>>>>>>>>>>>>>>>>>> > both. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Because the share the same > > > key, > > > > > >>>>> whatever > > > > > >>>>>>>>> result > > > > > >>>>>>>>>>>> record > > > > > >>>>>>>>>>>>>>>>>>> we emit > > > > > >>>>>>>>>>>>>>>>>>> > later, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> overwrite the previous > > result. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> This is the reason why Jan > > > > > originally > > > > > >>>>>>> proposed > > > > > >>>>>>>>>>>> to use > > > > > >>>>>>>>>>>>>>>>>>> a > > > > > >>>>>>>>>>>>>>>>>>> > combination > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> of > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> both primary keys of the > > input > > > > > tables > > > > > >>>>> as > > > > > >>>>>>> key > > > > > >>>>>>>>> of > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> output table. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> This > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> makes the keys of the > output > > > > table > > > > > >>>>> unique > > > > > >>>>>>> and > > > > > >>>>>>>>> we > > > > > >>>>>>>>>>>> can > > > > > >>>>>>>>>>>>>>>>>>> store both in > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> output table: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Result would be <A-a, > > > > join(X,1)>, > > > > > >> <A-b, > > > > > >>>>>>>>>>>> join(X,2)> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Thoughts? > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> -Matthias > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan > > > Filipiak > > > > > >> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Just on remark here. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> The high-watermark could > be > > > > > >>>>> disregarded. > > > > > >>>>>>> The > > > > > >>>>>>>>>>>> decision > > > > > >>>>>>>>>>>>>>>>>>> about the > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> forward > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> depends on the size of > the > > > > > >> aggregated > > > > > >>>>>>> map. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> Only 1 element long maps > > > would > > > > be > > > > > >>>>>>> unpacked > > > > > >>>>>>>>> and > > > > > >>>>>>>>>>>>>>>>>>> forwarded. 0 > > > > > >>>>>>>>>>>>>>>>>>> > element > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> maps > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> would be published as > > delete. > > > > Any > > > > > >>>>> other > > > > > >>>>>>> count > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of map entries is in > > "waiting > > > > for > > > > > >>>>> correct > > > > > >>>>>>>>>>>> deletes to > > > > > >>>>>>>>>>>>>>>>>>> > arrive"-state. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 21:29, Adam > > > > > Bellemare > > > > > >>>>>>> wrote: > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> It does look like I could > > > > replace > > > > > >> the > > > > > >>>>>>> second > > > > > >>>>>>>>>>>>>>>>>>> repartition store > > > > > >>>>>>>>>>>>>>>>>>> > and > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> highwater store with a > > > groupBy > > > > > and > > > > > >>>>>>> reduce. > > > > > >>>>>>>>>>>> However, > > > > > >>>>>>>>>>>>>>>>>>> it looks > > > > > >>>>>>>>>>>>>>>>>>> > like > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> I > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> would > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> still need to store the > > > > > highwater > > > > > >>>>> value > > > > > >>>>>>>>> within > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> materialized > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> store, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> to > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> compare the arrival of > > > > > out-of-order > > > > > >>>>>>> records > > > > > >>>>>>>>>>>> (assuming > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> my > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > >>>>>>>>> understanding > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> THIS is correct...). This > > in > > > > > effect > > > > > >> is > > > > > >>>>>>> the > > > > > >>>>>>>>> same > > > > > >>>>>>>>>>>> as > > > > > >>>>>>>>>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>>>>> design I > > > > > >>>>>>>>>>>>>>>>>>> > have > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> now, > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> just with the two tables > > > merged > > > > > >>>>> together. > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> -- > > > > > >>>>>>>>>>>>>>>>> -- Guozhang > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> -- > > > > > >>>>>>>>>>>>>>>> -- Guozhang > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >