Hi All Sorry for the delay - holidays and all. I have since updated the KIP with John's original suggestion and have pruned a number of the no longer relevant diagrams. Any more comments would be welcomed, otherwise I will look to kick off the vote again shortly.
Thanks Adam On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi John and Guozhang > > Ah yes, I lost that in the mix! Thanks for the convergent solutions - I do > think that the attachment that John included makes for a better design. It > should also help with overall performance as very high-cardinality foreign > keyed data (say millions of events with the same entity) will be able to > leverage the multiple nodes for join functionality instead of having it all > performed in one node. There is still a bottleneck in the right table > having to propagate all those events, but with slimmer structures, less IO > and no need to perform the join I think the throughput will be much higher > in those scenarios. > > Okay, I am convinced. I will update the KIP accordingly to a Gliffy > version of John's diagram and ensure that the example flow matches > correctly. Then I can go back to working on the PR to match the diagram. > > Thanks both of you for all the help - very much appreciated. > > Adam > > > > > > > > On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi John, >> >> Just made a pass on your diagram (nice hand-drawing btw!), and obviously >> we >> are thinking about the same thing :) A neat difference that I like, is >> that >> in the pre-join repartition topic we can still send message in the format >> of `K=k, V=(i=2)` while using "i" as the partition key in >> StreamsPartition, >> this way we do not need to even augment the key for the repartition topic, >> but just do a projection on the foreign key part but trim all other >> fields: >> as long as we still materialize the store as `A-2` co-located with the >> right KTable, that is fine. >> >> As I mentioned in my previous email, I also think this has a few >> advantages >> on saving over-the-wire bytes as well as disk bytes. >> >> Guozhang >> >> >> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <j...@confluent.io> wrote: >> >> > Hi Guozhang, >> > >> > Thanks for taking a look! I think Adam's already addressed your >> questions >> > as well as I could have. >> > >> > Hi Adam, >> > >> > Thanks for updating the KIP. It looks great, especially how all the >> > need-to-know information is right at the top, followed by the details. >> > >> > Also, thanks for that high-level diagram. Actually, now that I'm looking >> > at it, I think part of my proposal got lost in translation, although I >> do >> > think that what you have there is also correct. >> > >> > I sketched up a crude diagram based on yours and attached it to the KIP >> > (I'm not sure if attached or inline images work on the mailing list): >> > >> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png >> > . It's also attached to this email for convenience. >> > >> > Hopefully, you can see how it's intended to line up, and which parts are >> > modified. >> > At a high level, instead of performing the join on the right-hand side, >> > we're essentially just registering interest, like "LHS key A wishes to >> > receive updates for RHS key 2". Then, when there is a new "interest" or >> any >> > updates to the RHS records, it "broadcasts" its state back to the LHS >> > records who are interested in it. >> > >> > Thus, instead of sending the LHS values to the RHS joiner workers and >> then >> > sending the join results back to the LHS worke be co-partitioned and >> > validated, we instead only send the LHS *keys* to the RHS workers and >> then >> > only the RHS k/v back to be joined by the LHS worker. >> > >> > I've been considering both your diagram and mine, and I *think* what I'm >> > proposing has a few advantages. >> > >> > Here are some points of interest as you look at the diagram: >> > * When we extract the foreign key and send it to the Pre-Join >> Repartition >> > Topic, we can send only the FK/PK pair. There's no need to worry about >> > custom partitioner logic, since we can just use the foreign key plainly >> as >> > the repartition record key. Also, we save on transmitting the LHS value, >> > since we only send its key in this step. >> > * We also only need to store the RHSKey:LHSKey mapping in the >> > MaterializedSubscriptionStore, saving on disk. We can use the same rocks >> > key format you proposed and the same algorithm involving range scans >> when >> > the RHS records get updated. >> > * Instead of joining on the right side, all we do is compose a >> > re-repartition record so we can broadcast the RHS k/v pair back to the >> > original LHS partition. (this is what the "rekey" node is doing) >> > * Then, there is a special kind of Joiner that's co-resident in the same >> > StreamTask as the LHS table, subscribed to the Post-Join Repartition >> Topic. >> > ** This Joiner is *not* triggered directly by any changes in the LHS >> > KTable. Instead, LHS events indirectly trigger the join via the whole >> > lifecycle. >> > ** For each event arriving from the Post-Join Repartition Topic, the >> > Joiner looks up the corresponding record in the LHS KTable. It validates >> > the FK as you noted, discarding any inconsistent events. Otherwise, it >> > unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the join >> > result >> > ** Note that the Joiner itself is stateless, so materializing the join >> > result is optional, just as with the 1:1 joins. >> > >> > So in summary: >> > * instead of transmitting the LHS keys and values to the right and the >> > JoinResult back to the left, we only transmit the LHS keys to the right >> and >> > the RHS values to the left. Assuming the average RHS value is on smaller >> > than or equal to the average join result size, it's a clear win on >> broker >> > traffic. I think this is actually a reasonable assumption, which we can >> > discuss more if you're suspicious. >> > * we only need one copy of the data (the left and right tables need to >> be >> > materialized) and one extra copy of the PK:FK pairs in the Materialized >> > Subscription Store. Materializing the join result is optional, just as >> with >> > the existing 1:1 joins. >> > * we still need the fancy range-scan algorithm on the right to locate >> all >> > interested LHS keys when a RHS value is updated, but we don't need a >> custom >> > partitioner for either repartition topic (this is of course a >> modification >> > we could make to your version as well) >> > >> > How does this sound to you? (And did I miss anything?) >> > -John >> > >> > On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare < >> adam.bellem...@gmail.com> >> > wrote: >> > >> >> Hi John & Guozhang >> >> >> >> @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the >> KIP, >> >> pruned much of what I wrote and put a simplified diagram near the top >> to >> >> illustrate the workflow. I encapsulated Jan's content at the bottom of >> the >> >> document. I believe it is simpler to read by far now. >> >> >> >> @Guozhang Wang <wangg...@gmail.com>: >> >> > #1: rekey left table >> >> > -> source from the left upstream, send to rekey-processor to >> generate >> >> combined key, and then sink to copartition topic. >> >> Correct. >> >> >> >> > #2: first-join with right table >> >> > -> source from the right table upstream, materialize the right >> table. >> >> > -> source from the co-partition topic, materialize the rekeyed left >> >> table, join with the right table, rekey back, and then sink to the >> >> rekeyed-back topic. >> >> Almost - I cleared up the KIP. We do not rekey back yet, as I need the >> >> Foreign-Key value generated in #1 above to compare in the resolution >> >> stage. >> >> >> >> > #3: second join >> >> > -> source from the rekeyed-back topic, materialize the rekeyed >> back >> >> table. >> >> > -> source from the left upstream, materialize the left table, join >> >> with >> >> the rekeyed back table. >> >> Almost - As each event comes in, we just run it through a stateful >> >> processor that checks the original ("This") KTable for the key. The >> value >> >> payload then has the foreignKeyExtractor applied again as in Part #1 >> >> above, >> >> and gets the current foreign key. Then we compare it to the joined >> event >> >> that we are currently resolving. If they have the same foreign-key, >> >> propagate the result out. If they don't, throw the event away. >> >> >> >> The end result is that we do need to materialize 2 additional tables >> >> (left/this-combinedkey table, and the final Joined table) as I've >> >> illustrated in the updated KIP. I hope the diagram clears it up a lot >> >> better. Please let me know. >> >> >> >> Thanks again >> >> Adam >> >> >> >> >> >> >> >> >> >> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> >> >> > John, >> >> > >> >> > Thanks a lot for the suggestions on refactoring the wiki, I agree >> with >> >> you >> >> > that we should consider the KIP proposal to be easily understood by >> >> anyone >> >> > in the future to read, and hence should provide a good summary on the >> >> > user-facing interfaces, as well as rejected alternatives to represent >> >> > briefly "how we came a long way to this conclusion, and what we have >> >> > argued, disagreed, and agreed about, etc" so that readers do not >> need to >> >> > dig into the DISCUSS thread to get all the details. We can, of >> course, >> >> keep >> >> > the implementation details like "workflows" on the wiki page as a >> >> addendum >> >> > section since it also has correlations. >> >> > >> >> > Regarding your proposal on comment 6): that's a very interesting >> idea! >> >> Just >> >> > to clarify that I understands it fully correctly: the proposal's >> >> resulted >> >> > topology is still the same as the current proposal, where we will >> have 3 >> >> > sub-topologies for this operator: >> >> > >> >> > #1: rekey left table >> >> > -> source from the left upstream, send to rekey-processor to >> generate >> >> > combined key, and then sink to copartition topic. >> >> > >> >> > #2: first-join with right table >> >> > -> source from the right table upstream, materialize the right >> table. >> >> > -> source from the co-partition topic, materialize the rekeyed >> left >> >> > table, join with the right table, rekey back, and then sink to the >> >> > rekeyed-back topic. >> >> > >> >> > #3: second join >> >> > -> source from the rekeyed-back topic, materialize the rekeyed >> back >> >> > table. >> >> > -> source from the left upstream, materialize the left table, join >> >> with >> >> > the rekeyed back table. >> >> > >> >> > Sub-topology #1 and #3 may be merged to a single sub-topology since >> >> both of >> >> > them read from the left table source stream. In this workflow, we >> need >> >> to >> >> > materialize 4 tables (left table in #3, right table in #2, rekeyed >> left >> >> > table in #2, rekeyed-back table in #3), and 2 repartition topics >> >> > (copartition topic, rekeyed-back topic). >> >> > >> >> > Compared with Adam's current proposal in the workflow overview, it >> has >> >> the >> >> > same num.materialize tables (left table, rekeyed left table, right >> >> table, >> >> > out-of-ordering resolver table), and same num.internal topics (two). >> The >> >> > advantage is that on the copartition topic, we can save bandwidth by >> not >> >> > sending value, and in #2 the rekeyed left table is smaller since we >> do >> >> not >> >> > have any values to materialize. Is that right? >> >> > >> >> > >> >> > Guozhang >> >> > >> >> > >> >> > >> >> > On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io> >> wrote: >> >> > >> >> > > Hi Adam, >> >> > > >> >> > > Given that the committers are all pretty busy right now, I think >> that >> >> it >> >> > > would help if you were to refactor the KIP a little to reduce the >> >> > workload >> >> > > for reviewers. >> >> > > >> >> > > I'd recommend the following changes: >> >> > > * relocate all internal details to a section at the end called >> >> something >> >> > > like "Implementation Notes" or something like that. >> >> > > * rewrite the rest of the KIP to be a succinct as possible and >> mention >> >> > only >> >> > > publicly-facing API changes. >> >> > > ** for example, the interface that you've already listed there, as >> >> well >> >> > as >> >> > > a textual description of the guarantees we'll be providing (join >> >> result >> >> > is >> >> > > copartitioned with the LHS, and the join result is guaranteed >> correct) >> >> > > >> >> > > A good target would be that the whole main body of the KIP, >> including >> >> > > Status, Motivation, Proposal, Justification, and Rejected >> Alternatives >> >> > all >> >> > > fit "above the fold" (i.e., all fit on the screen at a comfortable >> >> zoom >> >> > > level). >> >> > > I think the only real Rejected Alternative that bears mention at >> this >> >> > point >> >> > > is KScatteredTable, which you could just include the executive >> >> summary on >> >> > > (no implementation details), and link to extra details in the >> >> > > Implementation Notes section. >> >> > > >> >> > > Taking a look at the wiki page, ~90% of the text there is internal >> >> > detail, >> >> > > which is useful for the dubious, but doesn't need to be ratified >> in a >> >> > vote >> >> > > (and would be subject to change without notice in the future >> anyway). >> >> > > There's also a lot of conflicting discussion, as you've very >> >> respectfully >> >> > > tried to preserve the original proposal from Jan while adding your >> >> own. >> >> > > Isolating all this information in a dedicated section at the bottom >> >> frees >> >> > > the voters up to focus on the public API part of the proposal, >> which >> >> is >> >> > > really all they need to consider. >> >> > > >> >> > > Plus, it'll be clear to future readers which parts of the document >> are >> >> > > enduring, and which parts are a snapshot of our implementation >> >> thinking >> >> > at >> >> > > the time. >> >> > > >> >> > > I'm suggesting this because I suspect that the others haven't made >> >> time >> >> > to >> >> > > review it partly because it seems daunting. If it seems like it >> would >> >> be >> >> > a >> >> > > huge time investment to review, people will just keep putting it >> off. >> >> But >> >> > > if the KIP is a single page, then they'll be more inclined to give >> it >> >> a >> >> > > read. >> >> > > >> >> > > Honestly, I don't think the KIP itself is that controversial (apart >> >> from >> >> > > the scattered table thing (sorry, Jan) ). Most of the discussion >> has >> >> been >> >> > > around the implementation, which we can continue more effectively >> in >> >> a PR >> >> > > once the KIP has passed. >> >> > > >> >> > > How does that sound? >> >> > > -John >> >> > > >> >> > > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare < >> >> adam.bellem...@gmail.com >> >> > > >> >> > > wrote: >> >> > > >> >> > > > 1) I believe that the resolution mechanism John has proposed is >> >> > > sufficient >> >> > > > - it is clean and easy and doesn't require additional RocksDB >> >> stores, >> >> > > which >> >> > > > reduces the footprint greatly. I don't think we need to resolve >> >> based >> >> > on >> >> > > > timestamp or offset anymore, but if we decide to do to that >> would be >> >> > > within >> >> > > > the bounds of the existing API. >> >> > > > >> >> > > > 2) Is the current API sufficient, or does it need to be altered >> to >> >> go >> >> > > back >> >> > > > to vote? >> >> > > > >> >> > > > 3) KScatteredTable implementation can always be added in a future >> >> > > revision. >> >> > > > This API does not rule it out. This implementation of this >> function >> >> > would >> >> > > > simply be replaced with `KScatteredTable.resolve()` while still >> >> > > maintaining >> >> > > > the existing API, thereby giving both features as Jan outlined >> >> earlier. >> >> > > > Would this work? >> >> > > > >> >> > > > >> >> > > > Thanks Guozhang, John and Jan >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io >> > >> >> > wrote: >> >> > > > >> >> > > > > Hi, all, >> >> > > > > >> >> > > > > >> In fact, we >> >> > > > > >> can just keep a single final-result store with timestamps >> and >> >> > reject >> >> > > > > values >> >> > > > > >> that have a smaller timestamp, is that right? >> >> > > > > >> >> > > > > > Which is the correct output should at least be decided on the >> >> > offset >> >> > > of >> >> > > > > > the original message. >> >> > > > > >> >> > > > > Thanks for this point, Jan. >> >> > > > > >> >> > > > > KIP-258 is merely to allow embedding the record timestamp in >> the >> >> k/v >> >> > > > > store, >> >> > > > > as well as providing a storage-format upgrade path. >> >> > > > > >> >> > > > > I might have missed it, but I think we have yet to discuss >> whether >> >> > it's >> >> > > > > safe >> >> > > > > or desirable just to swap topic-ordering our for >> >> timestamp-ordering. >> >> > > This >> >> > > > > is >> >> > > > > a very deep topic, and I think it would only pollute the >> current >> >> > > > > discussion. >> >> > > > > >> >> > > > > What Adam has proposed is safe, given the *current* ordering >> >> > semantics >> >> > > > > of the system. If we can agree on his proposal, I think we can >> >> merge >> >> > > the >> >> > > > > feature well before the conversation about timestamp ordering >> even >> >> > > takes >> >> > > > > place, much less reaches a conclusion. In the mean time, it >> would >> >> > seem >> >> > > to >> >> > > > > be unfortunate to have one join operator with different >> ordering >> >> > > > semantics >> >> > > > > from every other KTable operator. >> >> > > > > >> >> > > > > If and when that timestamp discussion takes place, many (all?) >> >> KTable >> >> > > > > operations >> >> > > > > will need to be updated, rendering the many:one join a small >> >> marginal >> >> > > > cost. >> >> > > > > >> >> > > > > And, just to plug it again, I proposed an algorithm above that >> I >> >> > > believe >> >> > > > > provides >> >> > > > > correct ordering without any additional metadata, and >> regardless >> >> of >> >> > the >> >> > > > > ordering semantics. I didn't bring it up further, because I >> felt >> >> the >> >> > > KIP >> >> > > > > only needs >> >> > > > > to agree on the public API, and we can discuss the >> implementation >> >> at >> >> > > > > leisure in >> >> > > > > a PR... >> >> > > > > >> >> > > > > Thanks, >> >> > > > > -John >> >> > > > > >> >> > > > > >> >> > > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak < >> >> > jan.filip...@trivago.com >> >> > > > >> >> > > > > wrote: >> >> > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > On 10.12.2018 07:42, Guozhang Wang wrote: >> >> > > > > > > Hello Adam / Jan / John, >> >> > > > > > > >> >> > > > > > > Sorry for being late on this thread! I've finally got some >> >> time >> >> > > this >> >> > > > > > > weekend to cleanup a load of tasks on my queue (actually >> I've >> >> > also >> >> > > > > > realized >> >> > > > > > > there are a bunch of other things I need to enqueue while >> >> > cleaning >> >> > > > them >> >> > > > > > up >> >> > > > > > > --- sth I need to improve on my side). So here are my >> >> thoughts: >> >> > > > > > > >> >> > > > > > > Regarding the APIs: I like the current written API in the >> KIP. >> >> > More >> >> > > > > > > generally I'd prefer to keep the 1) one-to-many join >> >> > > functionalities >> >> > > > as >> >> > > > > > > well as 2) other join types than inner as separate KIPs >> since >> >> 1) >> >> > > may >> >> > > > > > worth >> >> > > > > > > a general API refactoring that can benefit not only >> foreignkey >> >> > > joins >> >> > > > > but >> >> > > > > > > collocate joins as well (e.g. an extended proposal of >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup >> >> > > > > > ), >> >> > > > > > > and I'm not sure if other join types would actually be >> needed >> >> > > (maybe >> >> > > > > left >> >> > > > > > > join still makes sense), so it's better to >> >> > > > > wait-for-people-to-ask-and-add >> >> > > > > > > than add-sth-that-no-one-uses. >> >> > > > > > > >> >> > > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing >> a >> >> > > > > > > KScatteredTable for users to inject their own optimization: >> >> I'd >> >> > > > prefer >> >> > > > > to >> >> > > > > > > do the current option as-is, and my main rationale is for >> >> > > > optimization >> >> > > > > > > rooms inside the Streams internals and the API >> succinctness. >> >> For >> >> > > > > advanced >> >> > > > > > > users who may indeed prefer KScatteredTable and do their >> own >> >> > > > > > optimization, >> >> > > > > > > while it is too much of the work to use Processor API >> >> directly, I >> >> > > > think >> >> > > > > > we >> >> > > > > > > can still extend the current API to support it in the >> future >> >> if >> >> > it >> >> > > > > > becomes >> >> > > > > > > necessary. >> >> > > > > > >> >> > > > > > no internal optimization potential. it's a myth >> >> > > > > > >> >> > > > > > ¯\_(ツ)_/¯ >> >> > > > > > >> >> > > > > > :-) >> >> > > > > > >> >> > > > > > > >> >> > > > > > > Another note about step 4) resolving out-of-ordering data, >> as >> >> I >> >> > > > > mentioned >> >> > > > > > > before I think with KIP-258 (embedded timestamp with >> key-value >> >> > > store) >> >> > > > > we >> >> > > > > > > can actually make this step simpler than the current >> >> proposal. In >> >> > > > fact, >> >> > > > > > we >> >> > > > > > > can just keep a single final-result store with timestamps >> and >> >> > > reject >> >> > > > > > values >> >> > > > > > > that have a smaller timestamp, is that right? >> >> > > > > > >> >> > > > > > Which is the correct output should at least be decided on the >> >> > offset >> >> > > of >> >> > > > > > the original message. >> >> > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > That's all I have in mind now. Again, great appreciation to >> >> Adam >> >> > to >> >> > > > > make >> >> > > > > > > such HUGE progress on this KIP! >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > Guozhang >> >> > > > > > > >> >> > > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak < >> >> > > > jan.filip...@trivago.com> >> >> > > > > > > wrote: >> >> > > > > > > >> >> > > > > > >> If they don't find the time: >> >> > > > > > >> They usually take the opposite path from me :D >> >> > > > > > >> so the answer would be clear. >> >> > > > > > >> >> >> > > > > > >> hence my suggestion to vote. >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote: >> >> > > > > > >>> Hi Guozhang and Matthias >> >> > > > > > >>> >> >> > > > > > >>> I know both of you are quite busy, but we've gotten this >> KIP >> >> > to a >> >> > > > > point >> >> > > > > > >>> where we need more guidance on the API (perhaps a bit of >> a >> >> > > > > tie-breaker, >> >> > > > > > >> if >> >> > > > > > >>> you will). If you have anyone else you may think should >> >> look at >> >> > > > this, >> >> > > > > > >>> please tag them accordingly. >> >> > > > > > >>> >> >> > > > > > >>> The scenario is as such: >> >> > > > > > >>> >> >> > > > > > >>> Current Option: >> >> > > > > > >>> API: >> >> > > > > > >>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces >> >> > > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the >> >> > > partition >> >> > > > > with >> >> > > > > > >> the >> >> > > > > > >>> foreignKey (repartition 1) >> >> > > > > > >>> 2) Join the data >> >> > > > > > >>> 3) Shuffle the data back to the original node >> (repartition >> >> 2) >> >> > > > > > >>> 4) Resolve out-of-order arrival / race condition due to >> >> > > foreign-key >> >> > > > > > >> changes. >> >> > > > > > >>> >> >> > > > > > >>> Alternate Option: >> >> > > > > > >>> Perform #1 and #2 above, and return a KScatteredTable. >> >> > > > > > >>> - It would be keyed on a wrapped key function: >> >> <CombinedKey<KO, >> >> > > K>, >> >> > > > > VR> >> >> > > > > > >> (KO >> >> > > > > > >>> = Other Table Key, K = This Table Key, VR = Joined >> Result) >> >> > > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but >> >> > > otherwise a >> >> > > > > > user >> >> > > > > > >>> would be able to perform additional functions directly >> from >> >> the >> >> > > > > > >>> KScatteredTable (TBD - currently out of scope). >> >> > > > > > >>> - John's analysis 2-emails up is accurate as to the >> >> tradeoffs. >> >> > > > > > >>> >> >> > > > > > >>> Current Option is coded as-is. Alternate option is >> possible, >> >> > but >> >> > > > will >> >> > > > > > >>> require for implementation details to be made in the API >> and >> >> > some >> >> > > > > > >> exposure >> >> > > > > > >>> of new data structures into the API (ie: CombinedKey). >> >> > > > > > >>> >> >> > > > > > >>> I appreciate any insight into this. >> >> > > > > > >>> >> >> > > > > > >>> Thanks. >> >> > > > > > >>> >> >> > > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare < >> >> > > > > > adam.bellem...@gmail.com> >> >> > > > > > >>> wrote: >> >> > > > > > >>> >> >> > > > > > >>>> Hi John >> >> > > > > > >>>> >> >> > > > > > >>>> Thanks for your feedback and assistance. I think your >> >> summary >> >> > is >> >> > > > > > >> accurate >> >> > > > > > >>>> from my perspective. Additionally, I would like to add >> that >> >> > > there >> >> > > > > is a >> >> > > > > > >> risk >> >> > > > > > >>>> of inconsistent final states without performing the >> >> > resolution. >> >> > > > This >> >> > > > > > is >> >> > > > > > >> a >> >> > > > > > >>>> major concern for me as most of the data I have dealt >> with >> >> is >> >> > > > > produced >> >> > > > > > >> by >> >> > > > > > >>>> relational databases. We have seen a number of cases >> where >> >> a >> >> > > user >> >> > > > in >> >> > > > > > the >> >> > > > > > >>>> Rails UI has modified the field (foreign key), realized >> >> they >> >> > > made >> >> > > > a >> >> > > > > > >>>> mistake, and then updated the field again with a new >> key. >> >> The >> >> > > > events >> >> > > > > > are >> >> > > > > > >>>> propagated out as they are produced, and as such we have >> >> had >> >> > > > > > real-world >> >> > > > > > >>>> cases where these inconsistencies were propagated >> >> downstream >> >> > as >> >> > > > the >> >> > > > > > >> final >> >> > > > > > >>>> values due to the race conditions in the fanout of the >> >> data. >> >> > > > > > >>>> >> >> > > > > > >>>> This solution that I propose values correctness of the >> >> final >> >> > > > result >> >> > > > > > over >> >> > > > > > >>>> other factors. >> >> > > > > > >>>> >> >> > > > > > >>>> We could always move this function over to using a >> >> > > KScatteredTable >> >> > > > > > >>>> implementation in the future, and simply deprecate it >> this >> >> > join >> >> > > > API >> >> > > > > in >> >> > > > > > >>>> time. I think I would like to hear more from some of the >> >> other >> >> > > > major >> >> > > > > > >>>> committers on which course of action they would think is >> >> best >> >> > > > before >> >> > > > > > any >> >> > > > > > >>>> more coding is done. >> >> > > > > > >>>> >> >> > > > > > >>>> Thanks again >> >> > > > > > >>>> >> >> > > > > > >>>> Adam >> >> > > > > > >>>> >> >> > > > > > >>>> >> >> > > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler < >> >> > j...@confluent.io> >> >> > > > > > wrote: >> >> > > > > > >>>> >> >> > > > > > >>>>> Hi Jan and Adam, >> >> > > > > > >>>>> >> >> > > > > > >>>>> Wow, thanks for doing that test, Adam. Those results >> are >> >> > > > > encouraging. >> >> > > > > > >>>>> >> >> > > > > > >>>>> Thanks for your performance experience as well, Jan. I >> >> agree >> >> > > that >> >> > > > > > >> avoiding >> >> > > > > > >>>>> unnecessary join outputs is especially important when >> the >> >> > > fan-out >> >> > > > > is >> >> > > > > > so >> >> > > > > > >>>>> high. I suppose this could also be built into the >> >> > > implementation >> >> > > > > > we're >> >> > > > > > >>>>> discussing, but it wouldn't have to be specified in the >> >> KIP >> >> > > > (since >> >> > > > > > >> it's an >> >> > > > > > >>>>> API-transparent optimization). >> >> > > > > > >>>>> >> >> > > > > > >>>>> As far as whether or not to re-repartition the data, I >> >> didn't >> >> > > > bring >> >> > > > > > it >> >> > > > > > >> up >> >> > > > > > >>>>> because it sounded like the two of you agreed to leave >> the >> >> > KIP >> >> > > > > as-is, >> >> > > > > > >>>>> despite the disagreement. >> >> > > > > > >>>>> >> >> > > > > > >>>>> If you want my opinion, I feel like both approaches are >> >> > > > reasonable. >> >> > > > > > >>>>> It sounds like Jan values more the potential for >> >> developers >> >> > to >> >> > > > > > optimize >> >> > > > > > >>>>> their topologies to re-use the intermediate nodes, >> whereas >> >> > Adam >> >> > > > > > places >> >> > > > > > >>>>> more >> >> > > > > > >>>>> value on having a single operator that people can use >> >> without >> >> > > > extra >> >> > > > > > >> steps >> >> > > > > > >>>>> at the end. >> >> > > > > > >>>>> >> >> > > > > > >>>>> Personally, although I do find it exceptionally >> annoying >> >> > when a >> >> > > > > > >> framework >> >> > > > > > >>>>> gets in my way when I'm trying to optimize something, >> it >> >> > seems >> >> > > > > better >> >> > > > > > >> to >> >> > > > > > >>>>> go >> >> > > > > > >>>>> for a single operation. >> >> > > > > > >>>>> * Encapsulating the internal transitions gives us >> >> significant >> >> > > > > > latitude >> >> > > > > > >> in >> >> > > > > > >>>>> the implementation (for example, joining only at the >> end, >> >> not >> >> > > in >> >> > > > > the >> >> > > > > > >>>>> middle >> >> > > > > > >>>>> to avoid extra data copying and out-of-order >> resolution; >> >> how >> >> > we >> >> > > > > > >> represent >> >> > > > > > >>>>> the first repartition keys (combined keys vs. value >> >> vectors), >> >> > > > > etc.). >> >> > > > > > >> If we >> >> > > > > > >>>>> publish something like a KScatteredTable with the >> >> > > > right-partitioned >> >> > > > > > >> joined >> >> > > > > > >>>>> data, then the API pretty much locks in the >> >> implementation as >> >> > > > well. >> >> > > > > > >>>>> * The API seems simpler to understand and use. I do >> mean >> >> > > "seems"; >> >> > > > > if >> >> > > > > > >>>>> anyone >> >> > > > > > >>>>> wants to make the case that KScatteredTable is actually >> >> > > simpler, >> >> > > > I >> >> > > > > > >> think >> >> > > > > > >>>>> hypothetical usage code would help. From a relational >> >> algebra >> >> > > > > > >> perspective, >> >> > > > > > >>>>> it seems like KTable.join(KTable) should produce a new >> >> KTable >> >> > > in >> >> > > > > all >> >> > > > > > >>>>> cases. >> >> > > > > > >>>>> * That said, there might still be room in the API for a >> >> > > different >> >> > > > > > >>>>> operation >> >> > > > > > >>>>> like what Jan has proposed to scatter a KTable, and >> then >> >> do >> >> > > > things >> >> > > > > > like >> >> > > > > > >>>>> join, re-group, etc from there... I'm not sure; I >> haven't >> >> > > thought >> >> > > > > > >> through >> >> > > > > > >>>>> all the consequences yet. >> >> > > > > > >>>>> >> >> > > > > > >>>>> This is all just my opinion after thinking over the >> >> > discussion >> >> > > so >> >> > > > > > >> far... >> >> > > > > > >>>>> -John >> >> > > > > > >>>>> >> >> > > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare < >> >> > > > > > >> adam.bellem...@gmail.com> >> >> > > > > > >>>>> wrote: >> >> > > > > > >>>>> >> >> > > > > > >>>>>> Updated the PR to take into account John's feedback. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> I did some preliminary testing for the performance of >> the >> >> > > > > > prefixScan. >> >> > > > > > >> I >> >> > > > > > >>>>>> have attached the file, but I will also include the >> text >> >> in >> >> > > the >> >> > > > > body >> >> > > > > > >>>>> here >> >> > > > > > >>>>>> for archival purposes (I am not sure what happens to >> >> > attached >> >> > > > > > files). >> >> > > > > > >> I >> >> > > > > > >>>>>> also updated the PR and the KIP accordingly. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Summary: It scales exceptionally well for scanning >> large >> >> > > values >> >> > > > of >> >> > > > > > >>>>>> records. As Jan mentioned previously, the real issue >> >> would >> >> > be >> >> > > > more >> >> > > > > > >>>>> around >> >> > > > > > >>>>>> processing the resulting records after obtaining them. >> >> For >> >> > > > > instance, >> >> > > > > > >> it >> >> > > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer >> and a >> >> > > further >> >> > > > > > >>>>> ~35-85mS >> >> > > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of >> >> them. >> >> > > > > Iterating >> >> > > > > > >>>>>> through the records just to generate a simple count >> >> takes ~ >> >> > 40 >> >> > > > > times >> >> > > > > > >>>>> longer >> >> > > > > > >>>>>> than the flush + scan combined. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> ============================================================================================ >> >> > > > > > >>>>>> Setup: >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> ============================================================================================ >> >> > > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap >> >> > > (Xmx512m, >> >> > > > > > >> Xms512m) >> >> > > > > > >>>>>> CPU: i7 2.2 Ghz. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Note: I am using a slightly-modified, >> directly-accessible >> >> > > Kafka >> >> > > > > > >> Streams >> >> > > > > > >>>>>> RocksDB >> >> > > > > > >>>>>> implementation (RocksDB.java, basically just avoiding >> the >> >> > > > > > >>>>>> ProcessorContext). >> >> > > > > > >>>>>> There are no modifications to the default RocksDB >> values >> >> > > > provided >> >> > > > > in >> >> > > > > > >> the >> >> > > > > > >>>>>> 2.1/trunk release. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> keysize = 128 bytes >> >> > > > > > >>>>>> valsize = 512 bytes >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Step 1: >> >> > > > > > >>>>>> Write X positive matching events: (key = prefix + >> >> > left-padded >> >> > > > > > >>>>>> auto-incrementing integer) >> >> > > > > > >>>>>> Step 2: >> >> > > > > > >>>>>> Write 10X negative matching events (key = left-padded >> >> > > > > > >> auto-incrementing >> >> > > > > > >>>>>> integer) >> >> > > > > > >>>>>> Step 3: >> >> > > > > > >>>>>> Perform flush >> >> > > > > > >>>>>> Step 4: >> >> > > > > > >>>>>> Perform prefixScan >> >> > > > > > >>>>>> Step 5: >> >> > > > > > >>>>>> Iterate through return Iterator and validate the >> count of >> >> > > > expected >> >> > > > > > >>>>> events. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> ============================================================================================ >> >> > > > > > >>>>>> Results: >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> ============================================================================================ >> >> > > > > > >>>>>> X = 1k (11k events total) >> >> > > > > > >>>>>> Flush Time = 39 mS >> >> > > > > > >>>>>> Scan Time = 7 mS >> >> > > > > > >>>>>> 6.9 MB disk >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> -------------------------------------------------------------------------------------------- >> >> > > > > > >>>>>> X = 10k (110k events total) >> >> > > > > > >>>>>> Flush Time = 45 mS >> >> > > > > > >>>>>> Scan Time = 8 mS >> >> > > > > > >>>>>> 127 MB >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> -------------------------------------------------------------------------------------------- >> >> > > > > > >>>>>> X = 100k (1.1M events total) >> >> > > > > > >>>>>> Test1: >> >> > > > > > >>>>>> Flush Time = 60 mS >> >> > > > > > >>>>>> Scan Time = 12 mS >> >> > > > > > >>>>>> 678 MB >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Test2: >> >> > > > > > >>>>>> Flush Time = 45 mS >> >> > > > > > >>>>>> Scan Time = 7 mS >> >> > > > > > >>>>>> 576 MB >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> -------------------------------------------------------------------------------------------- >> >> > > > > > >>>>>> X = 1MB (11M events total) >> >> > > > > > >>>>>> Test1: >> >> > > > > > >>>>>> Flush Time = 52 mS >> >> > > > > > >>>>>> Scan Time = 19 mS >> >> > > > > > >>>>>> 7.2 GB >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Test2: >> >> > > > > > >>>>>> Flush Time = 84 mS >> >> > > > > > >>>>>> Scan Time = 34 mS >> >> > > > > > >>>>>> 9.1 GB >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> -------------------------------------------------------------------------------------------- >> >> > > > > > >>>>>> X = 2.5M (27.5M events total) >> >> > > > > > >>>>>> Test1: >> >> > > > > > >>>>>> Flush Time = 82 mS >> >> > > > > > >>>>>> Scan Time = 63 mS >> >> > > > > > >>>>>> 17GB - 276 sst files >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Test2: >> >> > > > > > >>>>>> Flush Time = 116 mS >> >> > > > > > >>>>>> Scan Time = 35 mS >> >> > > > > > >>>>>> 23GB - 361 sst files >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Test3: >> >> > > > > > >>>>>> Flush Time = 103 mS >> >> > > > > > >>>>>> Scan Time = 82 mS >> >> > > > > > >>>>>> 19 GB - 300 sst files >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> -------------------------------------------------------------------------------------------- >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M >> >> events. I >> >> > > > tried >> >> > > > > > to >> >> > > > > > >> go >> >> > > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into >> the >> >> > 100GB+ >> >> > > > > range >> >> > > > > > >>>>> and my >> >> > > > > > >>>>>> laptop ran out of disk. More extensive testing could >> be >> >> done >> >> > > > but I >> >> > > > > > >>>>> suspect >> >> > > > > > >>>>>> that it would be in line with what we're seeing in the >> >> > results >> >> > > > > > above. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> At this point in time, I think the only major >> discussion >> >> > point >> >> > > > is >> >> > > > > > >> really >> >> > > > > > >>>>>> around what Jan and I have disagreed on: >> repartitioning >> >> > back + >> >> > > > > > >> resolving >> >> > > > > > >>>>>> potential out of order issues or leaving that up to >> the >> >> > client >> >> > > > to >> >> > > > > > >>>>> handle. >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Thanks folks, >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> Adam >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak < >> >> > > > > > jan.filip...@trivago.com >> >> > > > > > >>> >> >> > > > > > >>>>>> wrote: >> >> > > > > > >>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote: >> >> > > > > > >>>>>>>> Hi all, >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Sorry that this discussion petered out... I think >> the >> >> 2.1 >> >> > > > > release >> >> > > > > > >>>>>>> caused an >> >> > > > > > >>>>>>>> extended distraction that pushed it off everyone's >> >> radar >> >> > > > (which >> >> > > > > > was >> >> > > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had >> >> some >> >> > > > extend >> >> > > > > > >>>>>>>> distractions of my own that kept (and continue to >> >> keep) me >> >> > > > > > >>>>> preoccupied. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> However, calling for a vote did wake me up, so I >> guess >> >> Jan >> >> > > was >> >> > > > > on >> >> > > > > > >> the >> >> > > > > > >>>>>>> right >> >> > > > > > >>>>>>>> track! >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> I've gone back and reviewed the whole KIP document >> and >> >> the >> >> > > > prior >> >> > > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts: >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> API Thoughts: >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a >> >> > many-to-one >> >> > > > > join. >> >> > > > > > >>>>> Could >> >> > > > > > >>>>>>> we >> >> > > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, >> >> flip >> >> > > the >> >> > > > > > design >> >> > > > > > >>>>>>> around >> >> > > > > > >>>>>>>> and make it a oneToManyJoin? >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the >> join >> >> > > type, >> >> > > > > and >> >> > > > > > it >> >> > > > > > >>>>>>> seems >> >> > > > > > >>>>>>>> like it might trick some people into using it for a >> >> > > one-to-one >> >> > > > > > join. >> >> > > > > > >>>>>>> This >> >> > > > > > >>>>>>>> would work, of course, but it would be super >> >> inefficient >> >> > > > > compared >> >> > > > > > to >> >> > > > > > >>>>> a >> >> > > > > > >>>>>>>> simple rekey-and-join. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 2. I might have missed it, but I don't think it's >> >> > specified >> >> > > > > > whether >> >> > > > > > >>>>>>> it's an >> >> > > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer >> >> join, as >> >> > > > > > >>>>> (neglecting >> >> > > > > > >>>>>>> IQ), >> >> > > > > > >>>>>>>> the rest can be achieved by filtering or by handling >> >> it in >> >> > > the >> >> > > > > > >>>>>>> ValueJoiner. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look >> quite >> >> > > right. >> >> > > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different >> >> > > paradigms >> >> > > > in >> >> > > > > > >>>>> play in >> >> > > > > > >>>>>>>> the Streams API, so it's confusing, but instead of >> >> three >> >> > > > > > Serialized >> >> > > > > > >>>>>>> args, I >> >> > > > > > >>>>>>>> think it would be better to have one that allows >> >> > > (optionally) >> >> > > > > > >> setting >> >> > > > > > >>>>>>> the 4 >> >> > > > > > >>>>>>>> incoming serdes. The result serde is defined by the >> >> > > > > Materialized. >> >> > > > > > >> The >> >> > > > > > >>>>>>>> incoming serdes can be optional because they might >> >> already >> >> > > be >> >> > > > > > >>>>> available >> >> > > > > > >>>>>>> on >> >> > > > > > >>>>>>>> the source KTables, or the default serdes from the >> >> config >> >> > > > might >> >> > > > > be >> >> > > > > > >>>>>>>> applicable. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other >> joins >> >> > > don't >> >> > > > > > allow >> >> > > > > > >>>>>>> setting >> >> > > > > > >>>>>>>> one, and it seems like it might actually be harmful, >> >> since >> >> > > the >> >> > > > > > rekey >> >> > > > > > >>>>>>>> operation needs to produce results that are >> >> co-partitioned >> >> > > > with >> >> > > > > > the >> >> > > > > > >>>>>>> "other" >> >> > > > > > >>>>>>>> KTable. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I >> >> didn't >> >> > > > > actually >> >> > > > > > >>>>>>> follow >> >> > > > > > >>>>>>>> what Matthias meant about namespacing requiring >> >> > > > "deserializing" >> >> > > > > > the >> >> > > > > > >>>>>>> record >> >> > > > > > >>>>>>>> header. The headers are already Strings, so I don't >> >> think >> >> > > that >> >> > > > > > >>>>>>>> deserialization is required. If we applied the >> >> namespace >> >> > at >> >> > > > > source >> >> > > > > > >>>>> nodes >> >> > > > > > >>>>>>>> and stripped it at sink nodes, this would be >> >> practically >> >> > no >> >> > > > > > >> overhead. >> >> > > > > > >>>>>>> The >> >> > > > > > >>>>>>>> advantage of the namespace idea is that no public >> API >> >> > change >> >> > > > wrt >> >> > > > > > >>>>> headers >> >> > > > > > >>>>>>>> needs to happen, and no restrictions need to be >> placed >> >> on >> >> > > > users' >> >> > > > > > >>>>>>> headers. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> (Although I'm wondering if we can get away without >> the >> >> > > header >> >> > > > at >> >> > > > > > >>>>> all... >> >> > > > > > >>>>>>>> stay tuned) >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM >> >> table >> >> > > > > growing >> >> > > > > > >>>>>>> without >> >> > > > > > >>>>>>>> bound. As I read it, the HWM table is effectively >> >> > > implementing >> >> > > > > OCC >> >> > > > > > >> to >> >> > > > > > >>>>>>>> resolve the problem you noted with disordering when >> the >> >> > > rekey >> >> > > > is >> >> > > > > > >>>>>>>> reversed... particularly notable when the FK >> changes. >> >> As >> >> > > such, >> >> > > > > it >> >> > > > > > >>>>> only >> >> > > > > > >>>>>>>> needs to track the most recent "version" (the >> offset in >> >> > the >> >> > > > > source >> >> > > > > > >>>>>>>> partition) of each key. Therefore, it should have >> the >> >> same >> >> > > > > number >> >> > > > > > of >> >> > > > > > >>>>>>> keys >> >> > > > > > >>>>>>>> as the source table at all times. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> I see that you are aware of KIP-258, which I think >> >> might >> >> > be >> >> > > > > > relevant >> >> > > > > > >>>>> in >> >> > > > > > >>>>>>> a >> >> > > > > > >>>>>>>> couple of ways. One: it's just about storing the >> >> timestamp >> >> > > in >> >> > > > > the >> >> > > > > > >>>>> state >> >> > > > > > >>>>>>>> store, but the ultimate idea is to effectively use >> the >> >> > > > timestamp >> >> > > > > > as >> >> > > > > > >>>>> an >> >> > > > > > >>>>>>> OCC >> >> > > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't >> >> want to >> >> > > use >> >> > > > > the >> >> > > > > > >>>>>>>> timestamp for this operation, but if you were to >> use a >> >> > > similar >> >> > > > > > >>>>>>> mechanism to >> >> > > > > > >>>>>>>> store the source offset in the store alongside the >> >> > re-keyed >> >> > > > > > values, >> >> > > > > > >>>>> then >> >> > > > > > >>>>>>>> you could avoid a separate table. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 6. You and Jan have been thinking about this for a >> long >> >> > > time, >> >> > > > so >> >> > > > > > >> I've >> >> > > > > > >>>>>>>> probably missed something here, but I'm wondering >> if we >> >> > can >> >> > > > > avoid >> >> > > > > > >> the >> >> > > > > > >>>>>>> HWM >> >> > > > > > >>>>>>>> tracking at all and resolve out-of-order during a >> final >> >> > join >> >> > > > > > >>>>> instead... >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Let's say we're joining a left table (Integer K: >> Letter >> >> > FK, >> >> > > > > (other >> >> > > > > > >>>>>>> data)) >> >> > > > > > >>>>>>>> to a right table (Letter K: (some data)). >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Left table: >> >> > > > > > >>>>>>>> 1: (A, xyz) >> >> > > > > > >>>>>>>> 2: (B, asd) >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Right table: >> >> > > > > > >>>>>>>> A: EntityA >> >> > > > > > >>>>>>>> B: EntityB >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> We could do a rekey as you proposed with a combined >> >> key, >> >> > but >> >> > > > not >> >> > > > > > >>>>>>>> propagating the value at all.. >> >> > > > > > >>>>>>>> Rekey table: >> >> > > > > > >>>>>>>> A-1: (dummy value) >> >> > > > > > >>>>>>>> B-2: (dummy value) >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Which we then join with the right table to produce: >> >> > > > > > >>>>>>>> A-1: EntityA >> >> > > > > > >>>>>>>> B-2: EntityB >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Which gets rekeyed back: >> >> > > > > > >>>>>>>> 1: A, EntityA >> >> > > > > > >>>>>>>> 2: B, EntityB >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> And finally we do the actual join: >> >> > > > > > >>>>>>>> Result table: >> >> > > > > > >>>>>>>> 1: ((A, xyz), EntityA) >> >> > > > > > >>>>>>>> 2: ((B, asd), EntityB) >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> The thing is that in that last join, we have the >> >> > opportunity >> >> > > > to >> >> > > > > > >>>>> compare >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>> current FK in the left table with the incoming PK of >> >> the >> >> > > right >> >> > > > > > >>>>> table. If >> >> > > > > > >>>>>>>> they don't match, we just drop the event, since it >> >> must be >> >> > > > > > outdated. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>>> In your KIP, you gave an example in which (1: A, >> xyz) >> >> gets >> >> > > > > updated >> >> > > > > > >> to >> >> > > > > > >>>>>>> (1: >> >> > > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about >> whether >> >> the >> >> > > > final >> >> > > > > > >>>>> state >> >> > > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the >> >> > algorithm >> >> > > > > above, >> >> > > > > > >>>>> you >> >> > > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: >> >> (B, >> >> > > xyz), >> >> > > > > (B, >> >> > > > > > >>>>>>>> EntityB)). It seems like this does give you enough >> >> > > information >> >> > > > > to >> >> > > > > > >>>>> make >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>> right choice, regardless of disordering. >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> Will check Adams patch, but this should work. As >> >> mentioned >> >> > > > often >> >> > > > > I >> >> > > > > > am >> >> > > > > > >>>>>>> not convinced on partitioning back for the user >> >> > > automatically. >> >> > > > I >> >> > > > > > >> think >> >> > > > > > >>>>>>> this is the real performance eater ;) >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the >> >> > > > performance >> >> > > > > of >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>> range scans when records change in the right table. >> >> You've >> >> > > > said >> >> > > > > > that >> >> > > > > > >>>>>>> you've >> >> > > > > > >>>>>>>> been using the algorithm you presented in production >> >> for a >> >> > > > > while. >> >> > > > > > >> Can >> >> > > > > > >>>>>>> you >> >> > > > > > >>>>>>>> give us a sense of the performance characteristics >> >> you've >> >> > > > > > observed? >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> Make it work, make it fast, make it beautiful. The >> >> topmost >> >> > > > thing >> >> > > > > > here >> >> > > > > > >>>>> is >> >> > > > > > >>>>>>> / was correctness. In practice I do not measure the >> >> > > performance >> >> > > > > of >> >> > > > > > >> the >> >> > > > > > >>>>>>> range scan. Usual cases I run this with is emitting >> >> 500k - >> >> > > 1kk >> >> > > > > rows >> >> > > > > > >>>>>>> on a left hand side change. The range scan is just >> the >> >> work >> >> > > you >> >> > > > > > gotta >> >> > > > > > >>>>>>> do, also when you pack your data into different >> formats, >> >> > > > usually >> >> > > > > > the >> >> > > > > > >>>>>>> rocks performance is very tight to the size of the >> data >> >> and >> >> > > we >> >> > > > > > can't >> >> > > > > > >>>>>>> really change that. It is more important for users to >> >> > prevent >> >> > > > > > useless >> >> > > > > > >>>>>>> updates to begin with. My left hand side is guarded >> to >> >> drop >> >> > > > > changes >> >> > > > > > >>>>> that >> >> > > > > > >>>>>>> are not going to change my join output. >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> usually it's: >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> drop unused fields and then don't forward if >> >> > old.equals(new) >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> regarding to the performance of creating an iterator >> for >> >> > > > smaller >> >> > > > > > >>>>>>> fanouts, users can still just do a group by first >> then >> >> > > anyways. >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>>>> I could only think of one alternative, but I'm not >> >> sure if >> >> > > > it's >> >> > > > > > >>>>> better >> >> > > > > > >>>>>>> or >> >> > > > > > >>>>>>>> worse... If the first re-key only needs to preserve >> the >> >> > > > original >> >> > > > > > >> key, >> >> > > > > > >>>>>>> as I >> >> > > > > > >>>>>>>> proposed in #6, then we could store a vector of >> keys in >> >> > the >> >> > > > > value: >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Left table: >> >> > > > > > >>>>>>>> 1: A,... >> >> > > > > > >>>>>>>> 2: B,... >> >> > > > > > >>>>>>>> 3: A,... >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Gets re-keyed: >> >> > > > > > >>>>>>>> A: [1, 3] >> >> > > > > > >>>>>>>> B: [2] >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Then, the rhs part of the join would only need a >> >> regular >> >> > > > > > single-key >> >> > > > > > >>>>>>> lookup. >> >> > > > > > >>>>>>>> Of course we have to deal with the problem of large >> >> > values, >> >> > > as >> >> > > > > > >>>>> there's >> >> > > > > > >>>>>>> no >> >> > > > > > >>>>>>>> bound on the number of lhs records that can >> reference >> >> rhs >> >> > > > > records. >> >> > > > > > >>>>>>> Offhand, >> >> > > > > > >>>>>>>> I'd say we could page the values, so when one row is >> >> past >> >> > > the >> >> > > > > > >>>>>>> threshold, we >> >> > > > > > >>>>>>>> append the key for the next page. Then in most >> cases, >> >> it >> >> > > would >> >> > > > > be >> >> > > > > > a >> >> > > > > > >>>>>>> single >> >> > > > > > >>>>>>>> key lookup, but for large fan-out updates, it would >> be >> >> one >> >> > > per >> >> > > > > > (max >> >> > > > > > >>>>>>> value >> >> > > > > > >>>>>>>> size)/(avg lhs key size). >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> This seems more complex, though... Plus, I think >> >> there's >> >> > > some >> >> > > > > > extra >> >> > > > > > >>>>>>>> tracking we'd need to do to know when to emit a >> >> > retraction. >> >> > > > For >> >> > > > > > >>>>> example, >> >> > > > > > >>>>>>>> when record 1 is deleted, the re-key table would >> just >> >> have >> >> > > (A: >> >> > > > > > [3]). >> >> > > > > > >>>>>>> Some >> >> > > > > > >>>>>>>> kind of tombstone is needed so that the join result >> >> for 1 >> >> > > can >> >> > > > > also >> >> > > > > > >> be >> >> > > > > > >>>>>>>> retracted. >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> That's all! >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> Thanks so much to both Adam and Jan for the >> thoughtful >> >> > KIP. >> >> > > > > Sorry >> >> > > > > > >> the >> >> > > > > > >>>>>>>> discussion has been slow. >> >> > > > > > >>>>>>>> -John >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak < >> >> > > > > > >>>>> jan.filip...@trivago.com> >> >> > > > > > >>>>>>>> wrote: >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>>>> Id say you can just call the vote. >> >> > > > > > >>>>>>>>> >> >> > > > > > >>>>>>>>> that happens all the time, and if something comes >> up, >> >> it >> >> > > just >> >> > > > > > goes >> >> > > > > > >>>>> back >> >> > > > > > >>>>>>>>> to discuss. >> >> > > > > > >>>>>>>>> >> >> > > > > > >>>>>>>>> would not expect to much attention with another >> >> another >> >> > > email >> >> > > > > in >> >> > > > > > >>>>> this >> >> > > > > > >>>>>>>>> thread. >> >> > > > > > >>>>>>>>> >> >> > > > > > >>>>>>>>> best Jan >> >> > > > > > >>>>>>>>> >> >> > > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote: >> >> > > > > > >>>>>>>>>> Hello Contributors >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do >> >> need >> >> > to >> >> > > > bump >> >> > > > > > >>>>> this to >> >> > > > > > >>>>>>>>> keep >> >> > > > > > >>>>>>>>>> visibility up. I am still intending to push this >> >> through >> >> > > > once >> >> > > > > > >>>>>>> contributor >> >> > > > > > >>>>>>>>>> feedback is given. >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> Main points that need addressing: >> >> > > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current >> >> > > singular >> >> > > > > > graph >> >> > > > > > >>>>> node >> >> > > > > > >>>>>>>>> into >> >> > > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters >> right >> >> > > now. I >> >> > > > > am >> >> > > > > > a >> >> > > > > > >>>>> bit >> >> > > > > > >>>>>>>>> fuzzy >> >> > > > > > >>>>>>>>>> on how the optimizations are supposed to work, so >> I >> >> > would >> >> > > > > > >>>>> appreciate >> >> > > > > > >>>>>>> any >> >> > > > > > >>>>>>>>>> help on this aspect. >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This >> >> thread >> >> > > has >> >> > > > > > much >> >> > > > > > >>>>>>>>> discourse >> >> > > > > > >>>>>>>>>> between Jan and I between the current highwater >> mark >> >> > > > proposal >> >> > > > > > and >> >> > > > > > >> a >> >> > > > > > >>>>>>>>> groupBy >> >> > > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we >> need >> >> to >> >> > > > > strictly >> >> > > > > > >>>>> handle >> >> > > > > > >>>>>>>>> any >> >> > > > > > >>>>>>>>>> chance of out-of-order data and leave none of it >> up >> >> to >> >> > the >> >> > > > > > >>>>> consumer. >> >> > > > > > >>>>>>> Any >> >> > > > > > >>>>>>>>>> comments or suggestions here would also help. >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> 3) Anything else that you see that would prevent >> this >> >> > from >> >> > > > > > moving >> >> > > > > > >>>>> to a >> >> > > > > > >>>>>>>>> vote? >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> Thanks >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> Adam >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare < >> >> > > > > > >>>>>>>>> adam.bellem...@gmail.com> >> >> > > > > > >>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>>>> Hi Jan >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and >> >> > > > > > >>>>> Stores.persistentWindowStore, >> >> > > > > > >>>>>>> you >> >> > > > > > >>>>>>>>>>> actually only need to specify the amount of >> segments >> >> > you >> >> > > > want >> >> > > > > > and >> >> > > > > > >>>>> how >> >> > > > > > >>>>>>>>> large >> >> > > > > > >>>>>>>>>>> they are. To the best of my understanding, what >> >> happens >> >> > > is >> >> > > > > that >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>>>>> segments are automatically rolled over as new >> data >> >> with >> >> > > new >> >> > > > > > >>>>>>> timestamps >> >> > > > > > >>>>>>>>> are >> >> > > > > > >>>>>>>>>>> created. We use this exact functionality in some >> of >> >> the >> >> > > > work >> >> > > > > > done >> >> > > > > > >>>>>>>>>>> internally at my company. For reference, this is >> the >> >> > > > hopping >> >> > > > > > >>>>> windowed >> >> > > > > > >>>>>>>>> store. >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21 >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> In the code that I have provided, there are going >> >> to be >> >> > > two >> >> > > > > 24h >> >> > > > > > >>>>>>>>> segments. >> >> > > > > > >>>>>>>>>>> When a record is put into the windowStore, it >> will >> >> be >> >> > > > > inserted >> >> > > > > > at >> >> > > > > > >>>>>>> time >> >> > > > > > >>>>>>>>> T in >> >> > > > > > >>>>>>>>>>> both segments. The two segments will always >> overlap >> >> by >> >> > > 12h. >> >> > > > > As >> >> > > > > > >>>>> time >> >> > > > > > >>>>>>>>> goes on >> >> > > > > > >>>>>>>>>>> and new records are added (say at time T+12h+), >> the >> >> > > oldest >> >> > > > > > >> segment >> >> > > > > > >>>>>>> will >> >> > > > > > >>>>>>>>> be >> >> > > > > > >>>>>>>>>>> automatically deleted and a new segment created. >> The >> >> > > > records >> >> > > > > > are >> >> > > > > > >>>>> by >> >> > > > > > >>>>>>>>> default >> >> > > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that >> it >> >> is >> >> > > the >> >> > > > > > record >> >> > > > > > >>>>>>> time, >> >> > > > > > >>>>>>>>> not >> >> > > > > > >>>>>>>>>>> the clock time, which is used. >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> To the best of my understanding, the timestamps >> are >> >> > > > retained >> >> > > > > > when >> >> > > > > > >>>>>>>>>>> restoring from the changelog. >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with >> TTL >> >> > at a >> >> > > > > > >>>>>>> segment-level, >> >> > > > > > >>>>>>>>>>> instead of at an individual record level. >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < >> >> > > > > > >>>>>>> jan.filip...@trivago.com> >> >> > > > > > >>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with >> >> > > > > > ClassCastException >> >> > > > > > >>>>> or >> >> > > > > > >>>>>>>>>>>> similar. >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> You either would have to specify the window you >> >> > > fetch/put >> >> > > > or >> >> > > > > > >>>>> iterate >> >> > > > > > >>>>>>>>>>>> across all windows the key was found in right? >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> I just hope the window-store doesn't check >> >> stream-time >> >> > > > under >> >> > > > > > the >> >> > > > > > >>>>>>> hoods >> >> > > > > > >>>>>>>>>>>> that would be a questionable interface. >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> If it does: did you see my comment on checking >> all >> >> the >> >> > > > > windows >> >> > > > > > >>>>>>> earlier? >> >> > > > > > >>>>>>>>>>>> that would be needed to actually give reasonable >> >> time >> >> > > > > > gurantees. >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> Best >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote: >> >> > > > > > >>>>>>>>>>>>> Hi Jan >> >> > > > > > >>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>> Check for " highwaterMat " in the PR. I only >> >> changed >> >> > > the >> >> > > > > > state >> >> > > > > > >>>>>>> store, >> >> > > > > > >>>>>>>>>>>> not >> >> > > > > > >>>>>>>>>>>>> the ProcessorSupplier. >> >> > > > > > >>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>> Thanks, >> >> > > > > > >>>>>>>>>>>>> Adam >> >> > > > > > >>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak < >> >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> >> > > > > > >>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> @Guozhang >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed >> >> > something >> >> > > > that >> >> > > > > > >>>>> will be >> >> > > > > > >>>>>>>>>>>>>>> extremely >> >> > > > > > >>>>>>>>>>>>>>> useful for this KIP. >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> @Jan >> >> > > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being >> said, I >> >> > will >> >> > > > not >> >> > > > > > be >> >> > > > > > >>>>>>> moving >> >> > > > > > >>>>>>>>>>>> ahead >> >> > > > > > >>>>>>>>>>>>>>> with an implementation using >> reshuffle/groupBy >> >> > > solution >> >> > > > > as >> >> > > > > > >> you >> >> > > > > > >>>>>>>>>>>> propose. >> >> > > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it >> >> > yourself >> >> > > > off >> >> > > > > > of >> >> > > > > > >>>>> my >> >> > > > > > >>>>>>>>>>>> current PR >> >> > > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I >> >> would >> >> > > be >> >> > > > > more >> >> > > > > > >>>>> than >> >> > > > > > >>>>>>>>>>>> happy to >> >> > > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it >> >> > stands >> >> > > > > right >> >> > > > > > >>>>> now, >> >> > > > > > >>>>>>> I do >> >> > > > > > >>>>>>>>>>>> not >> >> > > > > > >>>>>>>>>>>>>>> really have more time to invest into >> >> alternatives >> >> > > > without >> >> > > > > > >>>>> there >> >> > > > > > >>>>>>>>> being >> >> > > > > > >>>>>>>>>>>> a >> >> > > > > > >>>>>>>>>>>>>>> strong indication from the binding voters >> which >> >> > they >> >> > > > > would >> >> > > > > > >>>>>>> prefer. >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally >> gave >> >> up >> >> > on >> >> > > > the >> >> > > > > > >>>>> streams >> >> > > > > > >>>>>>>>> DSL >> >> > > > > > >>>>>>>>>>>> for >> >> > > > > > >>>>>>>>>>>>>> some time already, otherwise I would have >> pulled >> >> > this >> >> > > > KIP >> >> > > > > > >>>>> through >> >> > > > > > >>>>>>>>>>>> already. >> >> > > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL >> based on >> >> > > PAPI. >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the >> >> windowed >> >> > > > state >> >> > > > > > >>>>> store >> >> > > > > > >>>>>>> in >> >> > > > > > >>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>> next >> >> > > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and >> then I >> >> > will >> >> > > > come >> >> > > > > > >> back >> >> > > > > > >>>>>>> for >> >> > > > > > >>>>>>>>>>>> final >> >> > > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that >> any of >> >> > the >> >> > > > > > binding >> >> > > > > > >>>>>>> voters >> >> > > > > > >>>>>>>>>>>> could >> >> > > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have >> >> updated >> >> > it >> >> > > > > > >>>>> according >> >> > > > > > >>>>>>> to >> >> > > > > > >>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>> latest plan: >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >> >> > > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a >> windowed >> >> > > store. >> >> > > > > > This >> >> > > > > > >>>>>>> could >> >> > > > > > >>>>>>>>> be >> >> > > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever >> they >> >> > are >> >> > > > > > >>>>> completed. >> >> > > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> Thanks, >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> Adam >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier >> >> > already >> >> > > > > > updated >> >> > > > > > >>>>> in >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>>>>> PR? >> >> > > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long >> Missing >> >> > > > > something? >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang >> Wang < >> >> > > > > > >>>>>>> wangg...@gmail.com> >> >> > > > > > >>>>>>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 >> is >> >> the >> >> > > > wrong >> >> > > > > > >> link, >> >> > > > > > >>>>>>> as it >> >> > > > > > >>>>>>>>>>>> is >> >> > > > > > >>>>>>>>>>>>>>>> for >> >> > > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as >> >> part of >> >> > > > > KIP-258 >> >> > > > > > >>>>> we do >> >> > > > > > >>>>>>>>>>>> want to >> >> > > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source >> >> > KTable" >> >> > > > such >> >> > > > > > >> that >> >> > > > > > >>>>>>>>>>>> instead of >> >> > > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the >> materialized >> >> > store, >> >> > > > > i.e. >> >> > > > > > >>>>>>> following >> >> > > > > > >>>>>>>>>>>>>>>> offset >> >> > > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are >> older >> >> > than >> >> > > > the >> >> > > > > > >>>>> current >> >> > > > > > >>>>>>>>>>>> key's >> >> > > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp >> ordering. >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> Guozhang >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang >> >> Wang < >> >> > > > > > >>>>>>>>> wangg...@gmail.com> >> >> > > > > > >>>>>>>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> Hello Adam, >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the >> >> final >> >> > > step >> >> > > > > > (i.e. >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>>> high >> >> > > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced >> >> with >> >> > a >> >> > > > > window >> >> > > > > > >>>>>>> store), >> >> > > > > > >>>>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>> think >> >> > > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually >> >> help: >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a >> >> key-value >> >> > > > store >> >> > > > > > >>>>> (i.e. >> >> > > > > > >>>>>>> only >> >> > > > > > >>>>>>>>>>>> for >> >> > > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its >> >> usage, >> >> > as >> >> > > > > > >>>>> described >> >> > > > > > >>>>>>> in >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> https://issues.apache.org/jira/browse/KAFKA-5533 >> >> > , >> >> > > is >> >> > > > > > that >> >> > > > > > >>>>> we >> >> > > > > > >>>>>>> can >> >> > > > > > >>>>>>>>>>>> then >> >> > > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if >> its >> >> > > > > timestamp >> >> > > > > > is >> >> > > > > > >>>>>>>>> smaller >> >> > > > > > >>>>>>>>>>>> than >> >> > > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. >> I >> >> > think >> >> > > it >> >> > > > > is >> >> > > > > > >>>>> very >> >> > > > > > >>>>>>>>>>>> similar to >> >> > > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark >> based >> >> > > > > filtering, >> >> > > > > > >>>>> while >> >> > > > > > >>>>>>>>> you >> >> > > > > > >>>>>>>>>>>> only >> >> > > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of >> the >> >> > > joining >> >> > > > > > >> records >> >> > > > > > >>>>>>> are >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> correctly >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the >> >> final >> >> > > > stage. >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store >> and >> >> > hence >> >> > > > > > >>>>>>> non-windowed >> >> > > > > > >>>>>>>>>>>> KTables >> >> > > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not >> >> really >> >> > > have >> >> > > > a >> >> > > > > > good >> >> > > > > > >>>>>>>>> support >> >> > > > > > >>>>>>>>>>>> for >> >> > > > > > >>>>>>>>>>>>>>>>> their joins anyways ( >> >> > > > > > >>>>>>>>>>>> >> https://issues.apache.org/jira/browse/KAFKA-7107) >> >> > > > > > >>>>>>>>>>>>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed >> >> > > KTable-KTable >> >> > > > > > >>>>> non-key >> >> > > > > > >>>>>>>>> joins >> >> > > > > > >>>>>>>>>>>> for >> >> > > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help. >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> Guozhang >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan >> Filipiak >> >> < >> >> > > > > > >>>>>>>>>>>> jan.filip...@trivago.com >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation >> would >> >> > grow >> >> > > > > > >> endlessly >> >> > > > > > >>>>>>> based >> >> > > > > > >>>>>>>>>>>> on >> >> > > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a >> pair >> >> of >> >> > > > (<this >> >> > > > > > >>>>> table >> >> > > > > > >>>>>>>>>>>> primary >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> key>, >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This >> is >> >> used >> >> > > to >> >> > > > > > >>>>>>> differentiate >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> between >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest >> >> proposal >> >> > > > would >> >> > > > > > be >> >> > > > > > >>>>> to >> >> > > > > > >>>>>>>>>>>> replace >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> it >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. >> >> This >> >> > > would >> >> > > > > > allow >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>>> same >> >> > > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on >> time. >> >> This >> >> > > > > should >> >> > > > > > >>>>> allow >> >> > > > > > >>>>>>> for >> >> > > > > > >>>>>>>>>>>> all >> >> > > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and >> >> > should >> >> > > be >> >> > > > > > >>>>>>> customizable >> >> > > > > > >>>>>>>>>>>> by >> >> > > > > > >>>>>>>>>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: >> >> perhaps >> >> > > just >> >> > > > > 10 >> >> > > > > > >>>>>>> minutes >> >> > > > > > >>>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> window, >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...). >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can >> do >> >> the >> >> > > > trick >> >> > > > > > >> here. >> >> > > > > > >>>>>>> Even >> >> > > > > > >>>>>>>>>>>> if I >> >> > > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic >> >> > > repartitioning >> >> > > > > > >>>>> optional >> >> > > > > > >>>>>>>>>>>> since I >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> would >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I >> >> am a >> >> > > > little >> >> > > > > > bit >> >> > > > > > >>>>>>>>>>>> sceptical >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> about >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially >> one >> >> > > could >> >> > > > > run >> >> > > > > > >>>>> into >> >> > > > > > >>>>>>>>>>>> problems >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> when >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window >> >> border. I >> >> > > will >> >> > > > > > check >> >> > > > > > >>>>> you >> >> > > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its >> >> problematic, we >> >> > > > could >> >> > > > > > >>>>> still >> >> > > > > > >>>>>>>>> check >> >> > > > > > >>>>>>>>>>>>>>>>>> _all_ >> >> > > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad >> performance >> >> > > impact I >> >> > > > > > >> guess. >> >> > > > > > >>>>>>> Will >> >> > > > > > >>>>>>>>>>>> let >> >> > > > > > >>>>>>>>>>>>>>>>>> you >> >> > > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be >> correct >> >> as >> >> > > is. I >> >> > > > > > >>>>> wouldn't >> >> > > > > > >>>>>>> not >> >> > > > > > >>>>>>>>>>>> like >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => >> >> > > timestamp(A) < >> >> > > > > > >>>>>>>>> timestamp(B). >> >> > > > > > >>>>>>>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> think >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> we can't expect that. >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> @Jan >> >> > > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now >> - >> >> > thanks >> >> > > > for >> >> > > > > > the >> >> > > > > > >>>>>>>>>>>> diagram, it >> >> > > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I >> do >> >> not >> >> > > have >> >> > > > > the >> >> > > > > > >>>>>>> original >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> primary >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was >> >> > > available >> >> > > > > > then >> >> > > > > > >>>>> you >> >> > > > > > >>>>>>>>>>>> would be >> >> > > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the >> Map. >> >> > That >> >> > > > > being >> >> > > > > > >>>>> said, >> >> > > > > > >>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> encourage >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just >> for >> >> > > clarity >> >> > > > > for >> >> > > > > > >>>>>>> everyone >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> else. >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just >> really >> >> hard >> >> > > > work. >> >> > > > > > But >> >> > > > > > >>>>> I >> >> > > > > > >>>>>>>>>>>> understand >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the >> >> > > original >> >> > > > > > >> primary >> >> > > > > > >>>>>>> key, >> >> > > > > > >>>>>>>>> We >> >> > > > > > >>>>>>>>>>>>>>>>>> have >> >> > > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in >> PAPI >> >> > and >> >> > > > > > >> basically >> >> > > > > > >>>>>>> not >> >> > > > > > >>>>>>>>>>>> using >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> any >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely >> missed >> >> > that >> >> > > in >> >> > > > > > >>>>> original >> >> > > > > > >>>>>>> DSL >> >> > > > > > >>>>>>>>>>>> its >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> not >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess >> >> up on >> >> > > my >> >> > > > > end. >> >> > > > > > >>>>> Will >> >> > > > > > >>>>>>>>>>>> finish >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this >> >> week. >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't >> the >> >> Map >> >> > > stay >> >> > > > > > >> inside >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>>>>> State >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the >> changes >> >> > have >> >> > > > > > >>>>> propagated? >> >> > > > > > >>>>>>>>> Isn't >> >> > > > > > >>>>>>>>>>>>>>>>>>> this >> >> > > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark >> >> state >> >> > > > store? >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, >> >> substractor >> >> > is >> >> > > > > gonna >> >> > > > > > >>>>>>> return >> >> > > > > > >>>>>>>>>>>> `null` >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> and >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But >> >> there >> >> > is >> >> > > > > going >> >> > > > > > to >> >> > > > > > >>>>> be >> >> > > > > > >>>>>>> a >> >> > > > > > >>>>>>>>>>>> store >> >> > > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use >> this >> >> > store >> >> > > > > > directly >> >> > > > > > >>>>> for >> >> > > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() >> is a >> >> > > > regular >> >> > > > > > >>>>> store, >> >> > > > > > >>>>>>>>>>>> satisfying >> >> > > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / >> >> join. >> >> > > The >> >> > > > > > >>>>> Windowed >> >> > > > > > >>>>>>>>>>>> store is >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> not >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next >> statefull >> >> > > > operation >> >> > > > > > we >> >> > > > > > >>>>>>> would >> >> > > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we >> >> have >> >> > the >> >> > > > > > window >> >> > > > > > >>>>>>> store >> >> > > > > > >>>>>>>>>>>> also >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> have >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> the values then. >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a >> custom >> >> > group >> >> > > > by >> >> > > > > > >>>>> before >> >> > > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary >> key i >> >> > think >> >> > > > it >> >> > > > > > >> would >> >> > > > > > >>>>>>> help >> >> > > > > > >>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> users >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given >> the >> >> > > > original >> >> > > > > > >>>>> primary >> >> > > > > > >>>>>>>>> key >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> issue I >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid >> >> foundation >> >> > > to >> >> > > > > > build >> >> > > > > > >>>>> on. >> >> > > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the >> user. >> >> > very >> >> > > > > > >>>>>>> unfortunate. I >> >> > > > > > >>>>>>>>>>>> could >> >> > > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I >> do >> >> not >> >> > > > think >> >> > > > > > its >> >> > > > > > >>>>> a >> >> > > > > > >>>>>>> good >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> decision. >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> Thanks >> >> > > > > > >>>>>>>>>>>>>>>>>>> Adam >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, >> Prajakta >> >> > > Dumbre < >> >> > > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto: >> >> > > > > > >>>>>>> dumbreprajakta...@gmail.com >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> please remove me from this >> group >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:29 PM >> >> Jan >> >> > > > > Filipiak >> >> > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com >> >> <mailto: >> >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > Hi Adam, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > give me some time, will make >> >> such a >> >> > > > > chart. >> >> > > > > > >> last >> >> > > > > > >>>>>>> time i >> >> > > > > > >>>>>>>>>>>> didn't >> >> > > > > > >>>>>>>>>>>>>>>>>>> get along >> >> > > > > > >>>>>>>>>>>>>>>>>>> > well with giphy and ruined >> all >> >> your >> >> > > > > charts. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > Hopefully i can get it done >> >> today >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > On 08.09.2018 16:00, Adam >> >> Bellemare >> >> > > > > wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > Hi Jan >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > I have included a diagram >> of >> >> > what I >> >> > > > > > >> attempted >> >> > > > > > >>>>> on >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>>>>> KIP. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> >> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>> >> >> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining >> >> > > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate >> >> > > > > > >>>>>>>>>>>>>>>>>>> < >> >> > > > > > >>>>>>>>>>>> >> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>> >> >> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin >> >> > > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > I attempted this back at >> the >> >> > start >> >> > > of >> >> > > > > my >> >> > > > > > own >> >> > > > > > >>>>>>>>>>>> implementation >> >> > > > > > >>>>>>>>>>>>>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>>>>> this >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > solution, and since I could >> >> not >> >> > get >> >> > > > it >> >> > > > > to >> >> > > > > > >>>>> work I >> >> > > > > > >>>>>>> have >> >> > > > > > >>>>>>>>>>>> since >> >> > > > > > >>>>>>>>>>>>>>>>>>> discarded the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > code. At this point in >> time, >> >> if >> >> > you >> >> > > > > wish >> >> > > > > > to >> >> > > > > > >>>>>>> continue >> >> > > > > > >>>>>>>>>>>> pursuing >> >> > > > > > >>>>>>>>>>>>>>>>>>> for your >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > groupBy solution, I ask >> that >> >> you >> >> > > > please >> >> > > > > > >>>>> create a >> >> > > > > > >>>>>>>>>>>> diagram on >> >> > > > > > >>>>>>>>>>>>>>>>>>> the KIP >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > carefully explaining your >> >> > solution. >> >> > > > > > Please >> >> > > > > > >>>>> feel >> >> > > > > > >>>>>>> free >> >> > > > > > >>>>>>>>> to >> >> > > > > > >>>>>>>>>>>> use >> >> > > > > > >>>>>>>>>>>>>>>>>>> the image I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > just posted as a starting >> >> point. >> >> > I >> >> > > am >> >> > > > > > having >> >> > > > > > >>>>>>> trouble >> >> > > > > > >>>>>>>>>>>>>>>>>>> understanding your >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > explanations but I think >> that >> >> a >> >> > > > > carefully >> >> > > > > > >>>>>>> constructed >> >> > > > > > >>>>>>>>>>>> diagram >> >> > > > > > >>>>>>>>>>>>>>>>>>> will clear >> >> > > > > > >>>>>>>>>>>>>>>>>>> > up >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > any misunderstandings. >> >> > Alternately, >> >> > > > > > please >> >> > > > > > >>>>> post a >> >> > > > > > >>>>>>>>>>>>>>>>>>> comprehensive PR with >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > your solution. I can only >> >> guess >> >> > at >> >> > > > what >> >> > > > > > you >> >> > > > > > >>>>>>> mean, and >> >> > > > > > >>>>>>>>>>>> since I >> >> > > > > > >>>>>>>>>>>>>>>>>>> value my >> >> > > > > > >>>>>>>>>>>>>>>>>>> > own >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > time as much as you value >> >> yours, >> >> > I >> >> > > > > > believe >> >> > > > > > >> it >> >> > > > > > >>>>> is >> >> > > > > > >>>>>>> your >> >> > > > > > >>>>>>>>>>>>>>>>>>> responsibility to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > provide an implementation >> >> instead >> >> > > of >> >> > > > me >> >> > > > > > >>>>> trying to >> >> > > > > > >>>>>>>>> guess. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > Adam >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > On Sat, Sep 8, 2018 at 8:00 >> >> AM, >> >> > Jan >> >> > > > > > Filipiak >> >> > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com >> >> <mailto: >> >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Hi James, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> nice to see you beeing >> >> > interested. >> >> > > > > kafka >> >> > > > > > >>>>>>> streams at >> >> > > > > > >>>>>>>>>>>> this >> >> > > > > > >>>>>>>>>>>>>>>>>>> point supports >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> all sorts of joins as >> long as >> >> > both >> >> > > > > > streams >> >> > > > > > >>>>> have >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>>>>> same >> >> > > > > > >>>>>>>>>>>>>>>>>>> key. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Adam is currently >> >> implementing a >> >> > > > join >> >> > > > > > >> where a >> >> > > > > > >>>>>>> KTable >> >> > > > > > >>>>>>>>>>>> and a >> >> > > > > > >>>>>>>>>>>>>>>>>>> KTable can >> >> > > > > > >>>>>>>>>>>>>>>>>>> > have >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> a one to many relation >> ship >> >> > (1:n). >> >> > > > We >> >> > > > > > >> exploit >> >> > > > > > >>>>>>> that >> >> > > > > > >>>>>>>>>>>> rocksdb >> >> > > > > > >>>>>>>>>>>>>>>>>>> is >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> a >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > >> datastore that keeps data >> >> sorted >> >> > (At >> >> > > > > least >> >> > > > > > >>>>>>> exposes an >> >> > > > > > >>>>>>>>>>>> API to >> >> > > > > > >>>>>>>>>>>>>>>>>>> access the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> stored data in a sorted >> >> > fashion). >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> I think the technical >> caveats >> >> > are >> >> > > > well >> >> > > > > > >>>>>>> understood >> >> > > > > > >>>>>>>>> now >> >> > > > > > >>>>>>>>>>>> and we >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> are >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > basically >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> down to philosophy and API >> >> > Design >> >> > > ( >> >> > > > > when >> >> > > > > > >> Adam >> >> > > > > > >>>>>>> sees >> >> > > > > > >>>>>>>>> my >> >> > > > > > >>>>>>>>>>>> newest >> >> > > > > > >>>>>>>>>>>>>>>>>>> message). >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> I have a lengthy track >> >> record of >> >> > > > > loosing >> >> > > > > > >>>>> those >> >> > > > > > >>>>>>> kinda >> >> > > > > > >>>>>>>>>>>>>>>>>>> arguments within >> >> > > > > > >>>>>>>>>>>>>>>>>>> > the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> streams community and I >> have >> >> no >> >> > > clue >> >> > > > > > why. >> >> > > > > > >> So >> >> > > > > > >>>>> I >> >> > > > > > >>>>>>>>>>>> literally >> >> > > > > > >>>>>>>>>>>>>>>>>>> can't wait for >> >> > > > > > >>>>>>>>>>>>>>>>>>> > you >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> to churn through this >> thread >> >> and >> >> > > > give >> >> > > > > > you >> >> > > > > > >>>>>>> opinion on >> >> > > > > > >>>>>>>>>>>> how we >> >> > > > > > >>>>>>>>>>>>>>>>>>> should >> >> > > > > > >>>>>>>>>>>>>>>>>>> > design >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> the return type of the >> >> > > oneToManyJoin >> >> > > > > and >> >> > > > > > >> how >> >> > > > > > >>>>>>> many >> >> > > > > > >>>>>>>>>>>> power we >> >> > > > > > >>>>>>>>>>>>>>>>>>> want to give >> >> > > > > > >>>>>>>>>>>>>>>>>>> > to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> the user vs "simplicity" >> >> (where >> >> > > > > > simplicity >> >> > > > > > >>>>> isn't >> >> > > > > > >>>>>>>>>>>> really that >> >> > > > > > >>>>>>>>>>>>>>>>>>> as users >> >> > > > > > >>>>>>>>>>>>>>>>>>> > still >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> need to understand it I >> >> argue) >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> waiting for you to join >> in on >> >> > the >> >> > > > > > >> discussion >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Best Jan >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> On 07.09.2018 15:49, James >> >> Kwan >> >> > > > wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> I am new to this group >> and I >> >> > > found >> >> > > > > this >> >> > > > > > >>>>> subject >> >> > > > > > >>>>>>>>>>>>>>>>>>> interesting. Sounds >> >> > > > > > >>>>>>>>>>>>>>>>>>> > like >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> you guys want to >> implement a >> >> > join >> >> > > > > > table of >> >> > > > > > >>>>> two >> >> > > > > > >>>>>>>>>>>> streams? Is >> >> > > > > > >>>>>>>>>>>>>>>>>>> there >> >> > > > > > >>>>>>>>>>>>>>>>>>> > somewhere >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> I can see the original >> >> > > requirement >> >> > > > or >> >> > > > > > >>>>> proposal? >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:13 >> AM, >> >> Jan >> >> > > > > > Filipiak >> >> > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com >> >> <mailto: >> >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> On 05.09.2018 22:17, >> Adam >> >> > > > Bellemare >> >> > > > > > >> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> I'm currently testing >> >> using a >> >> > > > > > Windowed >> >> > > > > > >>>>> Store >> >> > > > > > >>>>>>> to >> >> > > > > > >>>>>>>>>>>> store the >> >> > > > > > >>>>>>>>>>>>>>>>>>> highwater >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> mark. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> By all indications this >> >> > should >> >> > > > work >> >> > > > > > >> fine, >> >> > > > > > >>>>>>> with >> >> > > > > > >>>>>>>>> the >> >> > > > > > >>>>>>>>>>>> caveat >> >> > > > > > >>>>>>>>>>>>>>>>>>> being that >> >> > > > > > >>>>>>>>>>>>>>>>>>> > it >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> can >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> only resolve >> out-of-order >> >> > > arrival >> >> > > > > > for up >> >> > > > > > >>>>> to >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>>>>> size of >> >> > > > > > >>>>>>>>>>>>>>>>>>> the window >> >> > > > > > >>>>>>>>>>>>>>>>>>> > (ie: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> 24h, 72h, etc). This >> would >> >> > > remove >> >> > > > > the >> >> > > > > > >>>>>>> possibility >> >> > > > > > >>>>>>>>>>>> of it >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> being >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > unbounded >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> in >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> size. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> With regards to Jan's >> >> > > > suggestion, I >> >> > > > > > >>>>> believe >> >> > > > > > >>>>>>> this >> >> > > > > > >>>>>>>>> is >> >> > > > > > >>>>>>>>>>>> where >> >> > > > > > >>>>>>>>>>>>>>>>>>> we will >> >> > > > > > >>>>>>>>>>>>>>>>>>> > have >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> remain in disagreement. >> >> > While I >> >> > > > do >> >> > > > > > not >> >> > > > > > >>>>>>> disagree >> >> > > > > > >>>>>>>>>>>> with your >> >> > > > > > >>>>>>>>>>>>>>>>>>> statement >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> about >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> there likely to be >> >> additional >> >> > > > joins >> >> > > > > > done >> >> > > > > > >>>>> in a >> >> > > > > > >>>>>>>>>>>> real-world >> >> > > > > > >>>>>>>>>>>>>>>>>>> workflow, I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > do >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> not >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> see how you can >> >> conclusively >> >> > > deal >> >> > > > > > with >> >> > > > > > >>>>>>>>> out-of-order >> >> > > > > > >>>>>>>>>>>>>>>>>>> arrival >> >> > > > > > >>>>>>>>>>>>>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> foreign-key >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> changes and subsequent >> >> > joins. I >> >> > > > > have >> >> > > > > > >>>>>>> attempted >> >> > > > > > >>>>>>>>> what >> >> > > > > > >>>>>>>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>>>> think you have >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> proposed (without a >> >> > high-water, >> >> > > > > using >> >> > > > > > >>>>>>> groupBy and >> >> > > > > > >>>>>>>>>>>> reduce) >> >> > > > > > >>>>>>>>>>>>>>>>>>> and found >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> that if >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> the foreign key changes >> >> too >> >> > > > > quickly, >> >> > > > > > or >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>> load >> >> > > > > > >>>>>>>>> on >> >> > > > > > >>>>>>>>>>>> a >> >> > > > > > >>>>>>>>>>>>>>>>>>> stream thread >> >> > > > > > >>>>>>>>>>>>>>>>>>> > is >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> too >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> high, the joined >> messages >> >> > will >> >> > > > > arrive >> >> > > > > > >>>>>>>>> out-of-order >> >> > > > > > >>>>>>>>>>>> and be >> >> > > > > > >>>>>>>>>>>>>>>>>>> incorrectly >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> propagated, such that >> an >> >> > > > > intermediate >> >> > > > > > >>>>> event >> >> > > > > > >>>>>>> is >> >> > > > > > >>>>>>>>>>>>>>>>>>> represented >> >> > > > > > >>>>>>>>>>>>>>>>>>> as the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > final >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> event. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Can you shed some light >> on >> >> > your >> >> > > > > > groupBy >> >> > > > > > >>>>>>>>>>>> implementation. >> >> > > > > > >>>>>>>>>>>>>>>>>>> There must be >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> some sort of flaw in it. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> I have a suspicion >> where it >> >> > is, >> >> > > I >> >> > > > > > would >> >> > > > > > >>>>> just >> >> > > > > > >>>>>>> like >> >> > > > > > >>>>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>>>>> confirm. The idea >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> is bullet proof and it >> >> must be >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> an implementation mess >> up. >> >> I >> >> > > would >> >> > > > > > like >> >> > > > > > >> to >> >> > > > > > >>>>>>> clarify >> >> > > > > > >>>>>>>>>>>> before >> >> > > > > > >>>>>>>>>>>>>>>>>>> we draw a >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> conclusion. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Repartitioning the >> >> > scattered >> >> > > > > events >> >> > > > > > >>>>> back to >> >> > > > > > >>>>>>>>> their >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> original >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > >>>>> partitions is the only >> way I >> >> > know >> >> > > > how >> >> > > > > > to >> >> > > > > > >>>>>>>>> conclusively >> >> > > > > > >>>>>>>>>>>> deal >> >> > > > > > >>>>>>>>>>>>>>>>>>> with >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> out-of-order events in >> a >> >> > given >> >> > > > time >> >> > > > > > >> frame, >> >> > > > > > >>>>>>> and to >> >> > > > > > >>>>>>>>>>>> ensure >> >> > > > > > >>>>>>>>>>>>>>>>>>> that the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > data >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> is >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> eventually consistent >> with >> >> > the >> >> > > > > input >> >> > > > > > >>>>> events. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> If you have some code >> to >> >> > share >> >> > > > that >> >> > > > > > >>>>>>> illustrates >> >> > > > > > >>>>>>>>> your >> >> > > > > > >>>>>>>>>>>>>>>>>>> approach, I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > would >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> be >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> very grateful as it >> would >> >> > > remove >> >> > > > > any >> >> > > > > > >>>>>>>>>>>> misunderstandings >> >> > > > > > >>>>>>>>>>>>>>>>>>> that I may >> >> > > > > > >>>>>>>>>>>>>>>>>>> > have. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> ah okay you were looking >> >> for >> >> > my >> >> > > > > code. >> >> > > > > > I >> >> > > > > > >>>>> don't >> >> > > > > > >>>>>>> have >> >> > > > > > >>>>>>>>>>>>>>>>>>> something easily >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> readable here as its >> >> bloated >> >> > > with >> >> > > > > > >>>>> OO-patterns. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> its anyhow trivial: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> @Override >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> public T apply(K >> >> aggKey, >> >> > V >> >> > > > > > value, T >> >> > > > > > >>>>>>>>> aggregate) >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Map<U, V> >> >> > > > > currentStateAsMap = >> >> > > > > > >>>>>>>>>>>> asMap(aggregate); >> >> > > > > > >>>>>>>>>>>>>>>>>>> << >> >> > > > > > >>>>>>>>>>>>>>>>>>> imaginary >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> U toModifyKey = >> >> > > > > > >>>>> mapper.apply(value); >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << this is >> the >> >> > > place >> >> > > > > > where >> >> > > > > > >>>>> people >> >> > > > > > >>>>>>>>>>>> actually >> >> > > > > > >>>>>>>>>>>>>>>>>>> gonna have >> >> > > > > > >>>>>>>>>>>>>>>>>>> > issues >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> and why you probably >> >> couldn't >> >> > do >> >> > > > it. >> >> > > > > > we >> >> > > > > > >>>>> would >> >> > > > > > >>>>>>> need >> >> > > > > > >>>>>>>>>>>> to find >> >> > > > > > >>>>>>>>>>>>>>>>>>> a solution >> >> > > > > > >>>>>>>>>>>>>>>>>>> > here. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> I didn't realize that >> yet. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << we >> >> propagate >> >> > the >> >> > > > > > field in >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>>>>>> joiner, so >> >> > > > > > >>>>>>>>>>>>>>>>>>> that we can >> >> > > > > > >>>>>>>>>>>>>>>>>>> > pick >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> it up in an aggregate. >> >> > Probably >> >> > > > you >> >> > > > > > have >> >> > > > > > >>>>> not >> >> > > > > > >>>>>>>>> thought >> >> > > > > > >>>>>>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>>>>> this in your >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> approach right? >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << I am >> very >> >> open >> >> > > to >> >> > > > > > find a >> >> > > > > > >>>>>>> generic >> >> > > > > > >>>>>>>>>>>> solution >> >> > > > > > >>>>>>>>>>>>>>>>>>> here. In my >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> honest opinion this is >> >> broken >> >> > in >> >> > > > > > >>>>>>>>> KTableImpl.GroupBy >> >> > > > > > >>>>>>>>>>>> that >> >> > > > > > >>>>>>>>>>>>>>>>>>> it >> >> > > > > > >>>>>>>>>>>>>>>>>>> looses >> >> > > > > > >>>>>>>>>>>>>>>>>>> > the keys >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> and only maintains the >> >> > aggregate >> >> > > > > key. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << I >> >> abstracted >> >> > it >> >> > > > away >> >> > > > > > back >> >> > > > > > >>>>>>> then way >> >> > > > > > >>>>>>>>>>>> before >> >> > > > > > >>>>>>>>>>>>>>>>>>> i >> >> > > > > > >>>>>>>>>>>>>>>>>>> was >> >> > > > > > >>>>>>>>>>>>>>>>>>> > thinking >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> of oneToMany join. That >> is >> >> > why I >> >> > > > > > didn't >> >> > > > > > >>>>>>> realize >> >> > > > > > >>>>>>>>> its >> >> > > > > > >>>>>>>>>>>>>>>>>>> significance here. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << >> Opinions? >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> for (V m : >> >> current) >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > currentStateAsMap.put(mapper.apply(m), >> >> > > > > > >> m); >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> if (isAdder) >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > currentStateAsMap.put(toModifyKey, >> >> > > > > > >> value); >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> else >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > currentStateAsMap.remove(toModifyKey); >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > if(currentStateAsMap.isEmpty()){ >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> return >> >> null; >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> retrun >> >> > > > > > >>>>>>> asAggregateType(currentStateAsMap) >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Thanks, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> Adam >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 at >> >> 3:35 >> >> > PM, >> >> > > > Jan >> >> > > > > > >>>>> Filipiak >> >> > > > > > >>>>>>> < >> >> > > > > > >>>>>>>>>>>>>>>>>>> > jan.filip...@trivago.com >> >> <mailto: >> >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> Thanks Adam for >> bringing >> >> > > Matthias >> >> > > > > to >> >> > > > > > >>>>> speed! >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> about the >> differences. I >> >> > think >> >> > > > > > >> re-keying >> >> > > > > > >>>>>>> back >> >> > > > > > >>>>>>>>>>>> should be >> >> > > > > > >>>>>>>>>>>>>>>>>>> optional at >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> best. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I would say we return >> a >> >> > > > > > KScatteredTable >> >> > > > > > >>>>> with >> >> > > > > > >>>>>>>>>>>> reshuffle() >> >> > > > > > >>>>>>>>>>>>>>>>>>> returning >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> KTable<originalKey,Joined> >> >> > to >> >> > > > make >> >> > > > > > the >> >> > > > > > >>>>>>> backwards >> >> > > > > > >>>>>>>>>>>>>>>>>>> repartitioning >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> optional. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I am also in a big >> >> favour of >> >> > > > doing >> >> > > > > > the >> >> > > > > > >>>>> out >> >> > > > > > >>>>>>> of >> >> > > > > > >>>>>>>>> order >> >> > > > > > >>>>>>>>>>>>>>>>>>> processing using >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> group >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> by instead high water >> >> mark >> >> > > > > tracking. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> Just because unbounded >> >> > growth >> >> > > is >> >> > > > > > just >> >> > > > > > >>>>> scary >> >> > > > > > >>>>>>> + It >> >> > > > > > >>>>>>>>>>>> saves >> >> > > > > > >>>>>>>>>>>>>>>>>>> us >> >> > > > > > >>>>>>>>>>>>>>>>>>> the header >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> stuff. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I think the >> abstraction >> >> of >> >> > > > always >> >> > > > > > >>>>>>> repartitioning >> >> > > > > > >>>>>>>>>>>> back is >> >> > > > > > >>>>>>>>>>>>>>>>>>> just not so >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> strong. Like the work >> has >> >> > been >> >> > > > > done >> >> > > > > > >>>>> before >> >> > > > > > >>>>>>> we >> >> > > > > > >>>>>>>>>>>> partition >> >> > > > > > >>>>>>>>>>>>>>>>>>> back and >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> grouping >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> by something else >> >> afterwards >> >> > > is >> >> > > > > > really >> >> > > > > > >>>>>>> common. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> On 05.09.2018 13:49, >> Adam >> >> > > > > Bellemare >> >> > > > > > >>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> Hi Matthias >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Thank you for your >> >> > feedback, >> >> > > I >> >> > > > do >> >> > > > > > >>>>>>> appreciate >> >> > > > > > >>>>>>>>> it! >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> While name spacing >> >> would be >> >> > > > > > possible, >> >> > > > > > >> it >> >> > > > > > >>>>>>> would >> >> > > > > > >>>>>>>>>>>> require >> >> > > > > > >>>>>>>>>>>>>>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > deserialize >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what >> >> implies >> >> > a >> >> > > > > > runtime >> >> > > > > > >>>>>>> overhead. >> >> > > > > > >>>>>>>>> I >> >> > > > > > >>>>>>>>>>>> would >> >> > > > > > >>>>>>>>>>>>>>>>>>> suggest to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > no >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to >> >> avoid >> >> > > the >> >> > > > > > >>>>> overhead. >> >> > > > > > >>>>>>> If >> >> > > > > > >>>>>>>>> this >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > problem in >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can >> >> still >> >> > add >> >> > > > > name >> >> > > > > > >>>>> spacing >> >> > > > > > >>>>>>>>> later >> >> > > > > > >>>>>>>>>>>> on. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Agreed. I will go >> with >> >> > > using a >> >> > > > > > >> reserved >> >> > > > > > >>>>>>> string >> >> > > > > > >>>>>>>>>>>> and >> >> > > > > > >>>>>>>>>>>>>>>>>>> document it. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> My main concern about >> >> the >> >> > > > design >> >> > > > > it >> >> > > > > > >> the >> >> > > > > > >>>>>>> type of >> >> > > > > > >>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> result KTable: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > If >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> understood the >> proposal >> >> > > > > correctly, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> In your example, you >> >> have >> >> > > > table1 >> >> > > > > > and >> >> > > > > > >>>>> table2 >> >> > > > > > >>>>>>>>>>>> swapped. >> >> > > > > > >>>>>>>>>>>>>>>>>>> Here is how it >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> works >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> currently: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 1) table1 has the >> >> records >> >> > > that >> >> > > > > > contain >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>>>>>> foreign key >> >> > > > > > >>>>>>>>>>>>>>>>>>> within their >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> value. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 input stream: >> >> > > > > > <a,(fk=A,bar=1)>, >> >> > > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> <c,(fk=B,bar=3)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table2 input stream: >> >> <A,X>, >> >> > > > <B,Y> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 2) A Value mapper is >> >> > required >> >> > > > to >> >> > > > > > >> extract >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>>>>> foreign >> >> > > > > > >>>>>>>>>>>>>>>>>>> key. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 foreign key >> >> mapper: >> >> > ( >> >> > > > > value >> >> > > > > > => >> >> > > > > > >>>>>>> value.fk >> >> > > > > > >>>>>>>>>>>>>>>>>>> <http://value.fk> ) >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> The mapper is >> applied to >> >> > each >> >> > > > > > element >> >> > > > > > >> in >> >> > > > > > >>>>>>>>> table1, >> >> > > > > > >>>>>>>>>>>> and a >> >> > > > > > >>>>>>>>>>>>>>>>>>> new combined >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> key is >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> made: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 mapped: <A-a, >> >> > > > > (fk=A,bar=1)>, >> >> > > > > > >>>>> <A-b, >> >> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>, >> >> > > > > > >>>>>>>>>>>>>>>>>>> <B-c, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> (fk=B,bar=3)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 3) The rekeyed events >> >> are >> >> > > > > > >> copartitioned >> >> > > > > > >>>>>>> with >> >> > > > > > >>>>>>>>>>>> table2: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> a) Stream Thread with >> >> > > Partition >> >> > > > > 0: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: >> >> <A-a, >> >> > > > > > >>>>> (fk=A,bar=1)>, >> >> > > > > > >>>>>>> <A-b, >> >> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <A,X> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> b) Stream Thread with >> >> > > Partition >> >> > > > > 1: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: >> >> <B-c, >> >> > > > > > >> (fk=B,bar=3)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <B,Y> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 4) From here, they >> can >> >> be >> >> > > > joined >> >> > > > > > >>>>> together >> >> > > > > > >>>>>>>>> locally >> >> > > > > > >>>>>>>>>>>> by >> >> > > > > > >>>>>>>>>>>>>>>>>>> applying the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> joiner >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> function. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> At this point, Jan's >> >> design >> >> > > and >> >> > > > > my >> >> > > > > > >>>>> design >> >> > > > > > >>>>>>>>>>>> deviate. My >> >> > > > > > >>>>>>>>>>>>>>>>>>> design goes >> >> > > > > > >>>>>>>>>>>>>>>>>>> > on >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> repartition the data >> >> > > post-join >> >> > > > > and >> >> > > > > > >>>>> resolve >> >> > > > > > >>>>>>>>>>>> out-of-order >> >> > > > > > >>>>>>>>>>>>>>>>>>> arrival of >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> records, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> finally returning the >> >> data >> >> > > > keyed >> >> > > > > > just >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>>>>>>> original key. >> >> > > > > > >>>>>>>>>>>>>>>>>>> I do not >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> expose >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> CombinedKey or any of >> >> the >> >> > > > > internals >> >> > > > > > >>>>>>> outside of >> >> > > > > > >>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> joinOnForeignKey >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> function. This does >> make >> >> > for >> >> > > > > larger >> >> > > > > > >>>>>>> footprint, >> >> > > > > > >>>>>>>>>>>> but it >> >> > > > > > >>>>>>>>>>>>>>>>>>> removes all >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> agency >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> for resolving >> >> out-of-order >> >> > > > > arrivals >> >> > > > > > >> and >> >> > > > > > >>>>>>>>> handling >> >> > > > > > >>>>>>>>>>>>>>>>>>> CombinedKeys from >> >> > > > > > >>>>>>>>>>>>>>>>>>> > the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> user. I believe that >> >> this >> >> > > makes >> >> > > > > the >> >> > > > > > >>>>>>> function >> >> > > > > > >>>>>>>>> much >> >> > > > > > >>>>>>>>>>>>>>>>>>> easier >> >> > > > > > >>>>>>>>>>>>>>>>>>> to use. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Let me know if this >> >> helps >> >> > > > resolve >> >> > > > > > your >> >> > > > > > >>>>>>>>> questions, >> >> > > > > > >>>>>>>>>>>> and >> >> > > > > > >>>>>>>>>>>>>>>>>>> please feel >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> free to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> add anything else on >> >> your >> >> > > mind. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Thanks again, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Adam >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 2018 >> at >> >> 8:36 >> >> > > PM, >> >> > > > > > >>>>> Matthias J. >> >> > > > > > >>>>>>>>> Sax < >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> matth...@confluent.io >> >> > > <mailto: >> >> > > > > > >>>>>>>>>>>> matth...@confluent.io>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Hi, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> I am just catching >> up >> >> on >> >> > > this >> >> > > > > > >> thread. I >> >> > > > > > >>>>>>> did >> >> > > > > > >>>>>>>>> not >> >> > > > > > >>>>>>>>>>>> read >> >> > > > > > >>>>>>>>>>>>>>>>>>> everything so >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> far, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> but want to share >> >> couple >> >> > of >> >> > > > > > initial >> >> > > > > > >>>>>>> thoughts: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Headers: I think >> there >> >> is >> >> > a >> >> > > > > > >> fundamental >> >> > > > > > >>>>>>>>>>>> difference >> >> > > > > > >>>>>>>>>>>>>>>>>>> between header >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> usage >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> in this KIP and >> KP-258. >> >> > For >> >> > > > 258, >> >> > > > > > we >> >> > > > > > >> add >> >> > > > > > >>>>>>>>> headers >> >> > > > > > >>>>>>>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>>>>> changelog topic >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> that >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> are owned by Kafka >> >> Streams >> >> > > and >> >> > > > > > nobody >> >> > > > > > >>>>>>> else is >> >> > > > > > >>>>>>>>>>>> supposed >> >> > > > > > >>>>>>>>>>>>>>>>>>> to write >> >> > > > > > >>>>>>>>>>>>>>>>>>> > into >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> them. In fact, no >> user >> >> > > header >> >> > > > > are >> >> > > > > > >>>>> written >> >> > > > > > >>>>>>> into >> >> > > > > > >>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> changelog topic >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> and >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> thus, there are not >> >> > > conflicts. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Nevertheless, I >> don't >> >> see >> >> > a >> >> > > > big >> >> > > > > > issue >> >> > > > > > >>>>> with >> >> > > > > > >>>>>>>>> using >> >> > > > > > >>>>>>>>>>>>>>>>>>> headers within >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Streams. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> As long as we >> document >> >> it, >> >> > > we >> >> > > > > can >> >> > > > > > >> have >> >> > > > > > >>>>>>> some >> >> > > > > > >>>>>>>>>>>> "reserved" >> >> > > > > > >>>>>>>>>>>>>>>>>>> header keys >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> and >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> users are not >> allowed >> >> to >> >> > use >> >> > > > > when >> >> > > > > > >>>>>>> processing >> >> > > > > > >>>>>>>>>>>> data with >> >> > > > > > >>>>>>>>>>>>>>>>>>> Kafka >> >> > > > > > >>>>>>>>>>>>>>>>>>> > Streams. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this should be >> >> ok. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> I think there is a >> safe >> >> > way >> >> > > to >> >> > > > > > avoid >> >> > > > > > >>>>>>>>> conflicts, >> >> > > > > > >>>>>>>>>>>> since >> >> > > > > > >>>>>>>>>>>>>>>>>>> these >> >> > > > > > >>>>>>>>>>>>>>>>>>> > headers >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> are >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> only needed in >> >> internal >> >> > > > topics >> >> > > > > (I >> >> > > > > > >>>>> think): >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> For internal and >> >> > changelog >> >> > > > > > topics, >> >> > > > > > >> we >> >> > > > > > >>>>> can >> >> > > > > > >>>>>>>>>>>> namespace >> >> > > > > > >>>>>>>>>>>>>>>>>>> all headers: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * user-defined >> headers >> >> > are >> >> > > > > > >> namespaced >> >> > > > > > >>>>> as >> >> > > > > > >>>>>>>>>>>> "external." >> >> > > > > > >>>>>>>>>>>>>>>>>>> + >> >> > > > > > >>>>>>>>>>>>>>>>>>> headerKey >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * internal headers >> are >> >> > > > > > namespaced as >> >> > > > > > >>>>>>>>>>>> "internal." + >> >> > > > > > >>>>>>>>>>>>>>>>>>> headerKey >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> While name spacing >> >> would >> >> > be >> >> > > > > > >> possible, >> >> > > > > > >>>>> it >> >> > > > > > >>>>>>>>> would >> >> > > > > > >>>>>>>>>>>>>>>>>>> require >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > >>>>>>>> deserialize >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what >> >> implies >> >> > a >> >> > > > > > runtime >> >> > > > > > >>>>>>> overhead. >> >> > > > > > >>>>>>>>> I >> >> > > > > > >>>>>>>>>>>> would >> >> > > > > > >>>>>>>>>>>>>>>>>>> suggest to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > no >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to >> >> avoid >> >> > > the >> >> > > > > > >>>>> overhead. >> >> > > > > > >>>>>>> If >> >> > > > > > >>>>>>>>> this >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > problem in >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can >> >> still >> >> > add >> >> > > > > name >> >> > > > > > >>>>> spacing >> >> > > > > > >>>>>>>>> later >> >> > > > > > >>>>>>>>>>>> on. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> My main concern >> about >> >> the >> >> > > > design >> >> > > > > > it >> >> > > > > > >> the >> >> > > > > > >>>>>>> type >> >> > > > > > >>>>>>>>> of >> >> > > > > > >>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> result KTable: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> If I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> understood the >> proposal >> >> > > > > correctly, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V1> >> table1 = >> >> ... >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K2,V2> >> table2 = >> >> ... >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V3> >> >> joinedTable >> >> > = >> >> > > > > > >>>>>>>>>>>> table1.join(table2,...); >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> implies that the >> >> > > `joinedTable` >> >> > > > > has >> >> > > > > > >> the >> >> > > > > > >>>>>>> same >> >> > > > > > >>>>>>>>> key >> >> > > > > > >>>>>>>>>>>> as the >> >> > > > > > >>>>>>>>>>>>>>>>>>> left input >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this does not >> >> work >> >> > > > because >> >> > > > > > if >> >> > > > > > >>>>> table2 >> >> > > > > > >>>>>>>>>>>> contains >> >> > > > > > >>>>>>>>>>>>>>>>>>> multiple rows >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> that >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> join with a record >> in >> >> > table1 >> >> > > > > > (what is >> >> > > > > > >>>>> the >> >> > > > > > >>>>>>> main >> >> > > > > > >>>>>>>>>>>> purpose >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> a >> >> > > > > > >>>>>>>>>>>>>>>>>>> > foreign >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> key >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> join), the result >> table >> >> > > would >> >> > > > > only >> >> > > > > > >>>>>>> contain a >> >> > > > > > >>>>>>>>>>>> single >> >> > > > > > >>>>>>>>>>>>>>>>>>> join result, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > but >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> not >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> multiple. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Example: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table1 input stream: >> >> <A,X> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table2 input stream: >> >> > > > <a,(A,1)>, >> >> > > > > > >>>>> <b,(A,2)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> We use table2 value >> a >> >> > > foreign >> >> > > > > key >> >> > > > > > to >> >> > > > > > >>>>>>> table1 >> >> > > > > > >>>>>>>>> key >> >> > > > > > >>>>>>>>>>>> (ie, >> >> > > > > > >>>>>>>>>>>>>>>>>>> "A" joins). >> >> > > > > > >>>>>>>>>>>>>>>>>>> > If >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> result key is the >> same >> >> key >> >> > > as >> >> > > > > key >> >> > > > > > of >> >> > > > > > >>>>>>> table1, >> >> > > > > > >>>>>>>>> this >> >> > > > > > >>>>>>>>>>>>>>>>>>> implies that the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> result can either be >> >> <A, >> >> > > > > > join(X,1)> >> >> > > > > > >> or >> >> > > > > > >>>>> <A, >> >> > > > > > >>>>>>>>>>>> join(X,2)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> but not >> >> > > > > > >>>>>>>>>>>>>>>>>>> > both. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Because the share >> the >> >> same >> >> > > > key, >> >> > > > > > >>>>> whatever >> >> > > > > > >>>>>>>>> result >> >> > > > > > >>>>>>>>>>>> record >> >> > > > > > >>>>>>>>>>>>>>>>>>> we emit >> >> > > > > > >>>>>>>>>>>>>>>>>>> > later, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> overwrite the >> previous >> >> > > result. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> This is the reason >> why >> >> Jan >> >> > > > > > originally >> >> > > > > > >>>>>>> proposed >> >> > > > > > >>>>>>>>>>>> to use >> >> > > > > > >>>>>>>>>>>>>>>>>>> a >> >> > > > > > >>>>>>>>>>>>>>>>>>> > combination >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> both primary keys of >> >> the >> >> > > input >> >> > > > > > tables >> >> > > > > > >>>>> as >> >> > > > > > >>>>>>> key >> >> > > > > > >>>>>>>>> of >> >> > > > > > >>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> output table. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> This >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> makes the keys of >> the >> >> > output >> >> > > > > table >> >> > > > > > >>>>> unique >> >> > > > > > >>>>>>> and >> >> > > > > > >>>>>>>>> we >> >> > > > > > >>>>>>>>>>>> can >> >> > > > > > >>>>>>>>>>>>>>>>>>> store both in >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> output table: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Result would be >> <A-a, >> >> > > > > join(X,1)>, >> >> > > > > > >> <A-b, >> >> > > > > > >>>>>>>>>>>> join(X,2)> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Thoughts? >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> -Matthias >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, >> Jan >> >> > > > Filipiak >> >> > > > > > >> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Just on remark here. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> The high-watermark >> >> could >> >> > be >> >> > > > > > >>>>> disregarded. >> >> > > > > > >>>>>>> The >> >> > > > > > >>>>>>>>>>>> decision >> >> > > > > > >>>>>>>>>>>>>>>>>>> about the >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> forward >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> depends on the >> size of >> >> > the >> >> > > > > > >> aggregated >> >> > > > > > >>>>>>> map. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> Only 1 element long >> >> maps >> >> > > > would >> >> > > > > be >> >> > > > > > >>>>>>> unpacked >> >> > > > > > >>>>>>>>> and >> >> > > > > > >>>>>>>>>>>>>>>>>>> forwarded. 0 >> >> > > > > > >>>>>>>>>>>>>>>>>>> > element >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> maps >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> would be published >> as >> >> > > delete. >> >> > > > > Any >> >> > > > > > >>>>> other >> >> > > > > > >>>>>>> count >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of map entries is >> in >> >> > > "waiting >> >> > > > > for >> >> > > > > > >>>>> correct >> >> > > > > > >>>>>>>>>>>> deletes to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > arrive"-state. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 >> 21:29, >> >> Adam >> >> > > > > > Bellemare >> >> > > > > > >>>>>>> wrote: >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> It does look like I >> >> could >> >> > > > > replace >> >> > > > > > >> the >> >> > > > > > >>>>>>> second >> >> > > > > > >>>>>>>>>>>>>>>>>>> repartition store >> >> > > > > > >>>>>>>>>>>>>>>>>>> > and >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> highwater store >> with >> >> a >> >> > > > groupBy >> >> > > > > > and >> >> > > > > > >>>>>>> reduce. >> >> > > > > > >>>>>>>>>>>> However, >> >> > > > > > >>>>>>>>>>>>>>>>>>> it looks >> >> > > > > > >>>>>>>>>>>>>>>>>>> > like >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> would >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> still need to >> store >> >> the >> >> > > > > > highwater >> >> > > > > > >>>>> value >> >> > > > > > >>>>>>>>> within >> >> > > > > > >>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> materialized >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> store, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> to >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> compare the >> arrival of >> >> > > > > > out-of-order >> >> > > > > > >>>>>>> records >> >> > > > > > >>>>>>>>>>>> (assuming >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> my >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> > >>>>>>>>> understanding >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> THIS is >> correct...). >> >> This >> >> > > in >> >> > > > > > effect >> >> > > > > > >> is >> >> > > > > > >>>>>>> the >> >> > > > > > >>>>>>>>> same >> >> > > > > > >>>>>>>>>>>> as >> >> > > > > > >>>>>>>>>>>>>>>>>>> the >> >> > > > > > >>>>>>>>>>>>>>>>>>> design I >> >> > > > > > >>>>>>>>>>>>>>>>>>> > have >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> now, >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> just with the two >> >> tables >> >> > > > merged >> >> > > > > > >>>>> together. >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> -- >> >> > > > > > >>>>>>>>>>>>>>>>> -- Guozhang >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> -- >> >> > > > > > >>>>>>>>>>>>>>>> -- Guozhang >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>>> >> >> > > > > > >>>>>>>>>>> >> >> > > > > > >>>>>>>>>> >> >> > > > > > >>>>>>>>> >> >> > > > > > >>>>>>>> >> >> > > > > > >>>>>>> >> >> > > > > > >>>>>> >> >> > > > > > >>>>> >> >> > > > > > >>>> >> >> > > > > > >>> >> >> > > > > > >> >> >> > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> > >> >> > -- >> >> > -- Guozhang >> >> > >> >> >> > >> >> -- >> -- Guozhang >> >