Hi John, Just made a pass on your diagram (nice hand-drawing btw!), and obviously we are thinking about the same thing :) A neat difference that I like, is that in the pre-join repartition topic we can still send message in the format of `K=k, V=(i=2)` while using "i" as the partition key in StreamsPartition, this way we do not need to even augment the key for the repartition topic, but just do a projection on the foreign key part but trim all other fields: as long as we still materialize the store as `A-2` co-located with the right KTable, that is fine.
As I mentioned in my previous email, I also think this has a few advantages on saving over-the-wire bytes as well as disk bytes. Guozhang On Mon, Dec 17, 2018 at 3:17 PM John Roesler <j...@confluent.io> wrote: > Hi Guozhang, > > Thanks for taking a look! I think Adam's already addressed your questions > as well as I could have. > > Hi Adam, > > Thanks for updating the KIP. It looks great, especially how all the > need-to-know information is right at the top, followed by the details. > > Also, thanks for that high-level diagram. Actually, now that I'm looking > at it, I think part of my proposal got lost in translation, although I do > think that what you have there is also correct. > > I sketched up a crude diagram based on yours and attached it to the KIP > (I'm not sure if attached or inline images work on the mailing list): > https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png > . It's also attached to this email for convenience. > > Hopefully, you can see how it's intended to line up, and which parts are > modified. > At a high level, instead of performing the join on the right-hand side, > we're essentially just registering interest, like "LHS key A wishes to > receive updates for RHS key 2". Then, when there is a new "interest" or any > updates to the RHS records, it "broadcasts" its state back to the LHS > records who are interested in it. > > Thus, instead of sending the LHS values to the RHS joiner workers and then > sending the join results back to the LHS worke be co-partitioned and > validated, we instead only send the LHS *keys* to the RHS workers and then > only the RHS k/v back to be joined by the LHS worker. > > I've been considering both your diagram and mine, and I *think* what I'm > proposing has a few advantages. > > Here are some points of interest as you look at the diagram: > * When we extract the foreign key and send it to the Pre-Join Repartition > Topic, we can send only the FK/PK pair. There's no need to worry about > custom partitioner logic, since we can just use the foreign key plainly as > the repartition record key. Also, we save on transmitting the LHS value, > since we only send its key in this step. > * We also only need to store the RHSKey:LHSKey mapping in the > MaterializedSubscriptionStore, saving on disk. We can use the same rocks > key format you proposed and the same algorithm involving range scans when > the RHS records get updated. > * Instead of joining on the right side, all we do is compose a > re-repartition record so we can broadcast the RHS k/v pair back to the > original LHS partition. (this is what the "rekey" node is doing) > * Then, there is a special kind of Joiner that's co-resident in the same > StreamTask as the LHS table, subscribed to the Post-Join Repartition Topic. > ** This Joiner is *not* triggered directly by any changes in the LHS > KTable. Instead, LHS events indirectly trigger the join via the whole > lifecycle. > ** For each event arriving from the Post-Join Repartition Topic, the > Joiner looks up the corresponding record in the LHS KTable. It validates > the FK as you noted, discarding any inconsistent events. Otherwise, it > unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the join > result > ** Note that the Joiner itself is stateless, so materializing the join > result is optional, just as with the 1:1 joins. > > So in summary: > * instead of transmitting the LHS keys and values to the right and the > JoinResult back to the left, we only transmit the LHS keys to the right and > the RHS values to the left. Assuming the average RHS value is on smaller > than or equal to the average join result size, it's a clear win on broker > traffic. I think this is actually a reasonable assumption, which we can > discuss more if you're suspicious. > * we only need one copy of the data (the left and right tables need to be > materialized) and one extra copy of the PK:FK pairs in the Materialized > Subscription Store. Materializing the join result is optional, just as with > the existing 1:1 joins. > * we still need the fancy range-scan algorithm on the right to locate all > interested LHS keys when a RHS value is updated, but we don't need a custom > partitioner for either repartition topic (this is of course a modification > we could make to your version as well) > > How does this sound to you? (And did I miss anything?) > -John > > On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Hi John & Guozhang >> >> @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the KIP, >> pruned much of what I wrote and put a simplified diagram near the top to >> illustrate the workflow. I encapsulated Jan's content at the bottom of the >> document. I believe it is simpler to read by far now. >> >> @Guozhang Wang <wangg...@gmail.com>: >> > #1: rekey left table >> > -> source from the left upstream, send to rekey-processor to generate >> combined key, and then sink to copartition topic. >> Correct. >> >> > #2: first-join with right table >> > -> source from the right table upstream, materialize the right table. >> > -> source from the co-partition topic, materialize the rekeyed left >> table, join with the right table, rekey back, and then sink to the >> rekeyed-back topic. >> Almost - I cleared up the KIP. We do not rekey back yet, as I need the >> Foreign-Key value generated in #1 above to compare in the resolution >> stage. >> >> > #3: second join >> > -> source from the rekeyed-back topic, materialize the rekeyed back >> table. >> > -> source from the left upstream, materialize the left table, join >> with >> the rekeyed back table. >> Almost - As each event comes in, we just run it through a stateful >> processor that checks the original ("This") KTable for the key. The value >> payload then has the foreignKeyExtractor applied again as in Part #1 >> above, >> and gets the current foreign key. Then we compare it to the joined event >> that we are currently resolving. If they have the same foreign-key, >> propagate the result out. If they don't, throw the event away. >> >> The end result is that we do need to materialize 2 additional tables >> (left/this-combinedkey table, and the final Joined table) as I've >> illustrated in the updated KIP. I hope the diagram clears it up a lot >> better. Please let me know. >> >> Thanks again >> Adam >> >> >> >> >> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > John, >> > >> > Thanks a lot for the suggestions on refactoring the wiki, I agree with >> you >> > that we should consider the KIP proposal to be easily understood by >> anyone >> > in the future to read, and hence should provide a good summary on the >> > user-facing interfaces, as well as rejected alternatives to represent >> > briefly "how we came a long way to this conclusion, and what we have >> > argued, disagreed, and agreed about, etc" so that readers do not need to >> > dig into the DISCUSS thread to get all the details. We can, of course, >> keep >> > the implementation details like "workflows" on the wiki page as a >> addendum >> > section since it also has correlations. >> > >> > Regarding your proposal on comment 6): that's a very interesting idea! >> Just >> > to clarify that I understands it fully correctly: the proposal's >> resulted >> > topology is still the same as the current proposal, where we will have 3 >> > sub-topologies for this operator: >> > >> > #1: rekey left table >> > -> source from the left upstream, send to rekey-processor to generate >> > combined key, and then sink to copartition topic. >> > >> > #2: first-join with right table >> > -> source from the right table upstream, materialize the right table. >> > -> source from the co-partition topic, materialize the rekeyed left >> > table, join with the right table, rekey back, and then sink to the >> > rekeyed-back topic. >> > >> > #3: second join >> > -> source from the rekeyed-back topic, materialize the rekeyed back >> > table. >> > -> source from the left upstream, materialize the left table, join >> with >> > the rekeyed back table. >> > >> > Sub-topology #1 and #3 may be merged to a single sub-topology since >> both of >> > them read from the left table source stream. In this workflow, we need >> to >> > materialize 4 tables (left table in #3, right table in #2, rekeyed left >> > table in #2, rekeyed-back table in #3), and 2 repartition topics >> > (copartition topic, rekeyed-back topic). >> > >> > Compared with Adam's current proposal in the workflow overview, it has >> the >> > same num.materialize tables (left table, rekeyed left table, right >> table, >> > out-of-ordering resolver table), and same num.internal topics (two). The >> > advantage is that on the copartition topic, we can save bandwidth by not >> > sending value, and in #2 the rekeyed left table is smaller since we do >> not >> > have any values to materialize. Is that right? >> > >> > >> > Guozhang >> > >> > >> > >> > On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io> wrote: >> > >> > > Hi Adam, >> > > >> > > Given that the committers are all pretty busy right now, I think that >> it >> > > would help if you were to refactor the KIP a little to reduce the >> > workload >> > > for reviewers. >> > > >> > > I'd recommend the following changes: >> > > * relocate all internal details to a section at the end called >> something >> > > like "Implementation Notes" or something like that. >> > > * rewrite the rest of the KIP to be a succinct as possible and mention >> > only >> > > publicly-facing API changes. >> > > ** for example, the interface that you've already listed there, as >> well >> > as >> > > a textual description of the guarantees we'll be providing (join >> result >> > is >> > > copartitioned with the LHS, and the join result is guaranteed correct) >> > > >> > > A good target would be that the whole main body of the KIP, including >> > > Status, Motivation, Proposal, Justification, and Rejected Alternatives >> > all >> > > fit "above the fold" (i.e., all fit on the screen at a comfortable >> zoom >> > > level). >> > > I think the only real Rejected Alternative that bears mention at this >> > point >> > > is KScatteredTable, which you could just include the executive >> summary on >> > > (no implementation details), and link to extra details in the >> > > Implementation Notes section. >> > > >> > > Taking a look at the wiki page, ~90% of the text there is internal >> > detail, >> > > which is useful for the dubious, but doesn't need to be ratified in a >> > vote >> > > (and would be subject to change without notice in the future anyway). >> > > There's also a lot of conflicting discussion, as you've very >> respectfully >> > > tried to preserve the original proposal from Jan while adding your >> own. >> > > Isolating all this information in a dedicated section at the bottom >> frees >> > > the voters up to focus on the public API part of the proposal, which >> is >> > > really all they need to consider. >> > > >> > > Plus, it'll be clear to future readers which parts of the document are >> > > enduring, and which parts are a snapshot of our implementation >> thinking >> > at >> > > the time. >> > > >> > > I'm suggesting this because I suspect that the others haven't made >> time >> > to >> > > review it partly because it seems daunting. If it seems like it would >> be >> > a >> > > huge time investment to review, people will just keep putting it off. >> But >> > > if the KIP is a single page, then they'll be more inclined to give it >> a >> > > read. >> > > >> > > Honestly, I don't think the KIP itself is that controversial (apart >> from >> > > the scattered table thing (sorry, Jan) ). Most of the discussion has >> been >> > > around the implementation, which we can continue more effectively in >> a PR >> > > once the KIP has passed. >> > > >> > > How does that sound? >> > > -John >> > > >> > > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare < >> adam.bellem...@gmail.com >> > > >> > > wrote: >> > > >> > > > 1) I believe that the resolution mechanism John has proposed is >> > > sufficient >> > > > - it is clean and easy and doesn't require additional RocksDB >> stores, >> > > which >> > > > reduces the footprint greatly. I don't think we need to resolve >> based >> > on >> > > > timestamp or offset anymore, but if we decide to do to that would be >> > > within >> > > > the bounds of the existing API. >> > > > >> > > > 2) Is the current API sufficient, or does it need to be altered to >> go >> > > back >> > > > to vote? >> > > > >> > > > 3) KScatteredTable implementation can always be added in a future >> > > revision. >> > > > This API does not rule it out. This implementation of this function >> > would >> > > > simply be replaced with `KScatteredTable.resolve()` while still >> > > maintaining >> > > > the existing API, thereby giving both features as Jan outlined >> earlier. >> > > > Would this work? >> > > > >> > > > >> > > > Thanks Guozhang, John and Jan >> > > > >> > > > >> > > > >> > > > >> > > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io> >> > wrote: >> > > > >> > > > > Hi, all, >> > > > > >> > > > > >> In fact, we >> > > > > >> can just keep a single final-result store with timestamps and >> > reject >> > > > > values >> > > > > >> that have a smaller timestamp, is that right? >> > > > > >> > > > > > Which is the correct output should at least be decided on the >> > offset >> > > of >> > > > > > the original message. >> > > > > >> > > > > Thanks for this point, Jan. >> > > > > >> > > > > KIP-258 is merely to allow embedding the record timestamp in the >> k/v >> > > > > store, >> > > > > as well as providing a storage-format upgrade path. >> > > > > >> > > > > I might have missed it, but I think we have yet to discuss whether >> > it's >> > > > > safe >> > > > > or desirable just to swap topic-ordering our for >> timestamp-ordering. >> > > This >> > > > > is >> > > > > a very deep topic, and I think it would only pollute the current >> > > > > discussion. >> > > > > >> > > > > What Adam has proposed is safe, given the *current* ordering >> > semantics >> > > > > of the system. If we can agree on his proposal, I think we can >> merge >> > > the >> > > > > feature well before the conversation about timestamp ordering even >> > > takes >> > > > > place, much less reaches a conclusion. In the mean time, it would >> > seem >> > > to >> > > > > be unfortunate to have one join operator with different ordering >> > > > semantics >> > > > > from every other KTable operator. >> > > > > >> > > > > If and when that timestamp discussion takes place, many (all?) >> KTable >> > > > > operations >> > > > > will need to be updated, rendering the many:one join a small >> marginal >> > > > cost. >> > > > > >> > > > > And, just to plug it again, I proposed an algorithm above that I >> > > believe >> > > > > provides >> > > > > correct ordering without any additional metadata, and regardless >> of >> > the >> > > > > ordering semantics. I didn't bring it up further, because I felt >> the >> > > KIP >> > > > > only needs >> > > > > to agree on the public API, and we can discuss the implementation >> at >> > > > > leisure in >> > > > > a PR... >> > > > > >> > > > > Thanks, >> > > > > -John >> > > > > >> > > > > >> > > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak < >> > jan.filip...@trivago.com >> > > > >> > > > > wrote: >> > > > > >> > > > > > >> > > > > > >> > > > > > On 10.12.2018 07:42, Guozhang Wang wrote: >> > > > > > > Hello Adam / Jan / John, >> > > > > > > >> > > > > > > Sorry for being late on this thread! I've finally got some >> time >> > > this >> > > > > > > weekend to cleanup a load of tasks on my queue (actually I've >> > also >> > > > > > realized >> > > > > > > there are a bunch of other things I need to enqueue while >> > cleaning >> > > > them >> > > > > > up >> > > > > > > --- sth I need to improve on my side). So here are my >> thoughts: >> > > > > > > >> > > > > > > Regarding the APIs: I like the current written API in the KIP. >> > More >> > > > > > > generally I'd prefer to keep the 1) one-to-many join >> > > functionalities >> > > > as >> > > > > > > well as 2) other join types than inner as separate KIPs since >> 1) >> > > may >> > > > > > worth >> > > > > > > a general API refactoring that can benefit not only foreignkey >> > > joins >> > > > > but >> > > > > > > collocate joins as well (e.g. an extended proposal of >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup >> > > > > > ), >> > > > > > > and I'm not sure if other join types would actually be needed >> > > (maybe >> > > > > left >> > > > > > > join still makes sense), so it's better to >> > > > > wait-for-people-to-ask-and-add >> > > > > > > than add-sth-that-no-one-uses. >> > > > > > > >> > > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing a >> > > > > > > KScatteredTable for users to inject their own optimization: >> I'd >> > > > prefer >> > > > > to >> > > > > > > do the current option as-is, and my main rationale is for >> > > > optimization >> > > > > > > rooms inside the Streams internals and the API succinctness. >> For >> > > > > advanced >> > > > > > > users who may indeed prefer KScatteredTable and do their own >> > > > > > optimization, >> > > > > > > while it is too much of the work to use Processor API >> directly, I >> > > > think >> > > > > > we >> > > > > > > can still extend the current API to support it in the future >> if >> > it >> > > > > > becomes >> > > > > > > necessary. >> > > > > > >> > > > > > no internal optimization potential. it's a myth >> > > > > > >> > > > > > ¯\_(ツ)_/¯ >> > > > > > >> > > > > > :-) >> > > > > > >> > > > > > > >> > > > > > > Another note about step 4) resolving out-of-ordering data, as >> I >> > > > > mentioned >> > > > > > > before I think with KIP-258 (embedded timestamp with key-value >> > > store) >> > > > > we >> > > > > > > can actually make this step simpler than the current >> proposal. In >> > > > fact, >> > > > > > we >> > > > > > > can just keep a single final-result store with timestamps and >> > > reject >> > > > > > values >> > > > > > > that have a smaller timestamp, is that right? >> > > > > > >> > > > > > Which is the correct output should at least be decided on the >> > offset >> > > of >> > > > > > the original message. >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > That's all I have in mind now. Again, great appreciation to >> Adam >> > to >> > > > > make >> > > > > > > such HUGE progress on this KIP! >> > > > > > > >> > > > > > > >> > > > > > > Guozhang >> > > > > > > >> > > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak < >> > > > jan.filip...@trivago.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > >> If they don't find the time: >> > > > > > >> They usually take the opposite path from me :D >> > > > > > >> so the answer would be clear. >> > > > > > >> >> > > > > > >> hence my suggestion to vote. >> > > > > > >> >> > > > > > >> >> > > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote: >> > > > > > >>> Hi Guozhang and Matthias >> > > > > > >>> >> > > > > > >>> I know both of you are quite busy, but we've gotten this KIP >> > to a >> > > > > point >> > > > > > >>> where we need more guidance on the API (perhaps a bit of a >> > > > > tie-breaker, >> > > > > > >> if >> > > > > > >>> you will). If you have anyone else you may think should >> look at >> > > > this, >> > > > > > >>> please tag them accordingly. >> > > > > > >>> >> > > > > > >>> The scenario is as such: >> > > > > > >>> >> > > > > > >>> Current Option: >> > > > > > >>> API: >> > > > > > >>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces >> > > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the >> > > partition >> > > > > with >> > > > > > >> the >> > > > > > >>> foreignKey (repartition 1) >> > > > > > >>> 2) Join the data >> > > > > > >>> 3) Shuffle the data back to the original node (repartition >> 2) >> > > > > > >>> 4) Resolve out-of-order arrival / race condition due to >> > > foreign-key >> > > > > > >> changes. >> > > > > > >>> >> > > > > > >>> Alternate Option: >> > > > > > >>> Perform #1 and #2 above, and return a KScatteredTable. >> > > > > > >>> - It would be keyed on a wrapped key function: >> <CombinedKey<KO, >> > > K>, >> > > > > VR> >> > > > > > >> (KO >> > > > > > >>> = Other Table Key, K = This Table Key, VR = Joined Result) >> > > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but >> > > otherwise a >> > > > > > user >> > > > > > >>> would be able to perform additional functions directly from >> the >> > > > > > >>> KScatteredTable (TBD - currently out of scope). >> > > > > > >>> - John's analysis 2-emails up is accurate as to the >> tradeoffs. >> > > > > > >>> >> > > > > > >>> Current Option is coded as-is. Alternate option is possible, >> > but >> > > > will >> > > > > > >>> require for implementation details to be made in the API and >> > some >> > > > > > >> exposure >> > > > > > >>> of new data structures into the API (ie: CombinedKey). >> > > > > > >>> >> > > > > > >>> I appreciate any insight into this. >> > > > > > >>> >> > > > > > >>> Thanks. >> > > > > > >>> >> > > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare < >> > > > > > adam.bellem...@gmail.com> >> > > > > > >>> wrote: >> > > > > > >>> >> > > > > > >>>> Hi John >> > > > > > >>>> >> > > > > > >>>> Thanks for your feedback and assistance. I think your >> summary >> > is >> > > > > > >> accurate >> > > > > > >>>> from my perspective. Additionally, I would like to add that >> > > there >> > > > > is a >> > > > > > >> risk >> > > > > > >>>> of inconsistent final states without performing the >> > resolution. >> > > > This >> > > > > > is >> > > > > > >> a >> > > > > > >>>> major concern for me as most of the data I have dealt with >> is >> > > > > produced >> > > > > > >> by >> > > > > > >>>> relational databases. We have seen a number of cases where >> a >> > > user >> > > > in >> > > > > > the >> > > > > > >>>> Rails UI has modified the field (foreign key), realized >> they >> > > made >> > > > a >> > > > > > >>>> mistake, and then updated the field again with a new key. >> The >> > > > events >> > > > > > are >> > > > > > >>>> propagated out as they are produced, and as such we have >> had >> > > > > > real-world >> > > > > > >>>> cases where these inconsistencies were propagated >> downstream >> > as >> > > > the >> > > > > > >> final >> > > > > > >>>> values due to the race conditions in the fanout of the >> data. >> > > > > > >>>> >> > > > > > >>>> This solution that I propose values correctness of the >> final >> > > > result >> > > > > > over >> > > > > > >>>> other factors. >> > > > > > >>>> >> > > > > > >>>> We could always move this function over to using a >> > > KScatteredTable >> > > > > > >>>> implementation in the future, and simply deprecate it this >> > join >> > > > API >> > > > > in >> > > > > > >>>> time. I think I would like to hear more from some of the >> other >> > > > major >> > > > > > >>>> committers on which course of action they would think is >> best >> > > > before >> > > > > > any >> > > > > > >>>> more coding is done. >> > > > > > >>>> >> > > > > > >>>> Thanks again >> > > > > > >>>> >> > > > > > >>>> Adam >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler < >> > j...@confluent.io> >> > > > > > wrote: >> > > > > > >>>> >> > > > > > >>>>> Hi Jan and Adam, >> > > > > > >>>>> >> > > > > > >>>>> Wow, thanks for doing that test, Adam. Those results are >> > > > > encouraging. >> > > > > > >>>>> >> > > > > > >>>>> Thanks for your performance experience as well, Jan. I >> agree >> > > that >> > > > > > >> avoiding >> > > > > > >>>>> unnecessary join outputs is especially important when the >> > > fan-out >> > > > > is >> > > > > > so >> > > > > > >>>>> high. I suppose this could also be built into the >> > > implementation >> > > > > > we're >> > > > > > >>>>> discussing, but it wouldn't have to be specified in the >> KIP >> > > > (since >> > > > > > >> it's an >> > > > > > >>>>> API-transparent optimization). >> > > > > > >>>>> >> > > > > > >>>>> As far as whether or not to re-repartition the data, I >> didn't >> > > > bring >> > > > > > it >> > > > > > >> up >> > > > > > >>>>> because it sounded like the two of you agreed to leave the >> > KIP >> > > > > as-is, >> > > > > > >>>>> despite the disagreement. >> > > > > > >>>>> >> > > > > > >>>>> If you want my opinion, I feel like both approaches are >> > > > reasonable. >> > > > > > >>>>> It sounds like Jan values more the potential for >> developers >> > to >> > > > > > optimize >> > > > > > >>>>> their topologies to re-use the intermediate nodes, whereas >> > Adam >> > > > > > places >> > > > > > >>>>> more >> > > > > > >>>>> value on having a single operator that people can use >> without >> > > > extra >> > > > > > >> steps >> > > > > > >>>>> at the end. >> > > > > > >>>>> >> > > > > > >>>>> Personally, although I do find it exceptionally annoying >> > when a >> > > > > > >> framework >> > > > > > >>>>> gets in my way when I'm trying to optimize something, it >> > seems >> > > > > better >> > > > > > >> to >> > > > > > >>>>> go >> > > > > > >>>>> for a single operation. >> > > > > > >>>>> * Encapsulating the internal transitions gives us >> significant >> > > > > > latitude >> > > > > > >> in >> > > > > > >>>>> the implementation (for example, joining only at the end, >> not >> > > in >> > > > > the >> > > > > > >>>>> middle >> > > > > > >>>>> to avoid extra data copying and out-of-order resolution; >> how >> > we >> > > > > > >> represent >> > > > > > >>>>> the first repartition keys (combined keys vs. value >> vectors), >> > > > > etc.). >> > > > > > >> If we >> > > > > > >>>>> publish something like a KScatteredTable with the >> > > > right-partitioned >> > > > > > >> joined >> > > > > > >>>>> data, then the API pretty much locks in the >> implementation as >> > > > well. >> > > > > > >>>>> * The API seems simpler to understand and use. I do mean >> > > "seems"; >> > > > > if >> > > > > > >>>>> anyone >> > > > > > >>>>> wants to make the case that KScatteredTable is actually >> > > simpler, >> > > > I >> > > > > > >> think >> > > > > > >>>>> hypothetical usage code would help. From a relational >> algebra >> > > > > > >> perspective, >> > > > > > >>>>> it seems like KTable.join(KTable) should produce a new >> KTable >> > > in >> > > > > all >> > > > > > >>>>> cases. >> > > > > > >>>>> * That said, there might still be room in the API for a >> > > different >> > > > > > >>>>> operation >> > > > > > >>>>> like what Jan has proposed to scatter a KTable, and then >> do >> > > > things >> > > > > > like >> > > > > > >>>>> join, re-group, etc from there... I'm not sure; I haven't >> > > thought >> > > > > > >> through >> > > > > > >>>>> all the consequences yet. >> > > > > > >>>>> >> > > > > > >>>>> This is all just my opinion after thinking over the >> > discussion >> > > so >> > > > > > >> far... >> > > > > > >>>>> -John >> > > > > > >>>>> >> > > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare < >> > > > > > >> adam.bellem...@gmail.com> >> > > > > > >>>>> wrote: >> > > > > > >>>>> >> > > > > > >>>>>> Updated the PR to take into account John's feedback. >> > > > > > >>>>>> >> > > > > > >>>>>> I did some preliminary testing for the performance of the >> > > > > > prefixScan. >> > > > > > >> I >> > > > > > >>>>>> have attached the file, but I will also include the text >> in >> > > the >> > > > > body >> > > > > > >>>>> here >> > > > > > >>>>>> for archival purposes (I am not sure what happens to >> > attached >> > > > > > files). >> > > > > > >> I >> > > > > > >>>>>> also updated the PR and the KIP accordingly. >> > > > > > >>>>>> >> > > > > > >>>>>> Summary: It scales exceptionally well for scanning large >> > > values >> > > > of >> > > > > > >>>>>> records. As Jan mentioned previously, the real issue >> would >> > be >> > > > more >> > > > > > >>>>> around >> > > > > > >>>>>> processing the resulting records after obtaining them. >> For >> > > > > instance, >> > > > > > >> it >> > > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer and a >> > > further >> > > > > > >>>>> ~35-85mS >> > > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of >> them. >> > > > > Iterating >> > > > > > >>>>>> through the records just to generate a simple count >> takes ~ >> > 40 >> > > > > times >> > > > > > >>>>> longer >> > > > > > >>>>>> than the flush + scan combined. >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> ============================================================================================ >> > > > > > >>>>>> Setup: >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> ============================================================================================ >> > > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap >> > > (Xmx512m, >> > > > > > >> Xms512m) >> > > > > > >>>>>> CPU: i7 2.2 Ghz. >> > > > > > >>>>>> >> > > > > > >>>>>> Note: I am using a slightly-modified, directly-accessible >> > > Kafka >> > > > > > >> Streams >> > > > > > >>>>>> RocksDB >> > > > > > >>>>>> implementation (RocksDB.java, basically just avoiding the >> > > > > > >>>>>> ProcessorContext). >> > > > > > >>>>>> There are no modifications to the default RocksDB values >> > > > provided >> > > > > in >> > > > > > >> the >> > > > > > >>>>>> 2.1/trunk release. >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> keysize = 128 bytes >> > > > > > >>>>>> valsize = 512 bytes >> > > > > > >>>>>> >> > > > > > >>>>>> Step 1: >> > > > > > >>>>>> Write X positive matching events: (key = prefix + >> > left-padded >> > > > > > >>>>>> auto-incrementing integer) >> > > > > > >>>>>> Step 2: >> > > > > > >>>>>> Write 10X negative matching events (key = left-padded >> > > > > > >> auto-incrementing >> > > > > > >>>>>> integer) >> > > > > > >>>>>> Step 3: >> > > > > > >>>>>> Perform flush >> > > > > > >>>>>> Step 4: >> > > > > > >>>>>> Perform prefixScan >> > > > > > >>>>>> Step 5: >> > > > > > >>>>>> Iterate through return Iterator and validate the count of >> > > > expected >> > > > > > >>>>> events. >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> ============================================================================================ >> > > > > > >>>>>> Results: >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> ============================================================================================ >> > > > > > >>>>>> X = 1k (11k events total) >> > > > > > >>>>>> Flush Time = 39 mS >> > > > > > >>>>>> Scan Time = 7 mS >> > > > > > >>>>>> 6.9 MB disk >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> -------------------------------------------------------------------------------------------- >> > > > > > >>>>>> X = 10k (110k events total) >> > > > > > >>>>>> Flush Time = 45 mS >> > > > > > >>>>>> Scan Time = 8 mS >> > > > > > >>>>>> 127 MB >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> -------------------------------------------------------------------------------------------- >> > > > > > >>>>>> X = 100k (1.1M events total) >> > > > > > >>>>>> Test1: >> > > > > > >>>>>> Flush Time = 60 mS >> > > > > > >>>>>> Scan Time = 12 mS >> > > > > > >>>>>> 678 MB >> > > > > > >>>>>> >> > > > > > >>>>>> Test2: >> > > > > > >>>>>> Flush Time = 45 mS >> > > > > > >>>>>> Scan Time = 7 mS >> > > > > > >>>>>> 576 MB >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> -------------------------------------------------------------------------------------------- >> > > > > > >>>>>> X = 1MB (11M events total) >> > > > > > >>>>>> Test1: >> > > > > > >>>>>> Flush Time = 52 mS >> > > > > > >>>>>> Scan Time = 19 mS >> > > > > > >>>>>> 7.2 GB >> > > > > > >>>>>> >> > > > > > >>>>>> Test2: >> > > > > > >>>>>> Flush Time = 84 mS >> > > > > > >>>>>> Scan Time = 34 mS >> > > > > > >>>>>> 9.1 GB >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> -------------------------------------------------------------------------------------------- >> > > > > > >>>>>> X = 2.5M (27.5M events total) >> > > > > > >>>>>> Test1: >> > > > > > >>>>>> Flush Time = 82 mS >> > > > > > >>>>>> Scan Time = 63 mS >> > > > > > >>>>>> 17GB - 276 sst files >> > > > > > >>>>>> >> > > > > > >>>>>> Test2: >> > > > > > >>>>>> Flush Time = 116 mS >> > > > > > >>>>>> Scan Time = 35 mS >> > > > > > >>>>>> 23GB - 361 sst files >> > > > > > >>>>>> >> > > > > > >>>>>> Test3: >> > > > > > >>>>>> Flush Time = 103 mS >> > > > > > >>>>>> Scan Time = 82 mS >> > > > > > >>>>>> 19 GB - 300 sst files >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> -------------------------------------------------------------------------------------------- >> > > > > > >>>>>> >> > > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M >> events. I >> > > > tried >> > > > > > to >> > > > > > >> go >> > > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into the >> > 100GB+ >> > > > > range >> > > > > > >>>>> and my >> > > > > > >>>>>> laptop ran out of disk. More extensive testing could be >> done >> > > > but I >> > > > > > >>>>> suspect >> > > > > > >>>>>> that it would be in line with what we're seeing in the >> > results >> > > > > > above. >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> At this point in time, I think the only major discussion >> > point >> > > > is >> > > > > > >> really >> > > > > > >>>>>> around what Jan and I have disagreed on: repartitioning >> > back + >> > > > > > >> resolving >> > > > > > >>>>>> potential out of order issues or leaving that up to the >> > client >> > > > to >> > > > > > >>>>> handle. >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> Thanks folks, >> > > > > > >>>>>> >> > > > > > >>>>>> Adam >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak < >> > > > > > jan.filip...@trivago.com >> > > > > > >>> >> > > > > > >>>>>> wrote: >> > > > > > >>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote: >> > > > > > >>>>>>>> Hi all, >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Sorry that this discussion petered out... I think the >> 2.1 >> > > > > release >> > > > > > >>>>>>> caused an >> > > > > > >>>>>>>> extended distraction that pushed it off everyone's >> radar >> > > > (which >> > > > > > was >> > > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had >> some >> > > > extend >> > > > > > >>>>>>>> distractions of my own that kept (and continue to >> keep) me >> > > > > > >>>>> preoccupied. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> However, calling for a vote did wake me up, so I guess >> Jan >> > > was >> > > > > on >> > > > > > >> the >> > > > > > >>>>>>> right >> > > > > > >>>>>>>> track! >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> I've gone back and reviewed the whole KIP document and >> the >> > > > prior >> > > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts: >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> API Thoughts: >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a >> > many-to-one >> > > > > join. >> > > > > > >>>>> Could >> > > > > > >>>>>>> we >> > > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, >> flip >> > > the >> > > > > > design >> > > > > > >>>>>>> around >> > > > > > >>>>>>>> and make it a oneToManyJoin? >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the join >> > > type, >> > > > > and >> > > > > > it >> > > > > > >>>>>>> seems >> > > > > > >>>>>>>> like it might trick some people into using it for a >> > > one-to-one >> > > > > > join. >> > > > > > >>>>>>> This >> > > > > > >>>>>>>> would work, of course, but it would be super >> inefficient >> > > > > compared >> > > > > > to >> > > > > > >>>>> a >> > > > > > >>>>>>>> simple rekey-and-join. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 2. I might have missed it, but I don't think it's >> > specified >> > > > > > whether >> > > > > > >>>>>>> it's an >> > > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer >> join, as >> > > > > > >>>>> (neglecting >> > > > > > >>>>>>> IQ), >> > > > > > >>>>>>>> the rest can be achieved by filtering or by handling >> it in >> > > the >> > > > > > >>>>>>> ValueJoiner. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite >> > > right. >> > > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different >> > > paradigms >> > > > in >> > > > > > >>>>> play in >> > > > > > >>>>>>>> the Streams API, so it's confusing, but instead of >> three >> > > > > > Serialized >> > > > > > >>>>>>> args, I >> > > > > > >>>>>>>> think it would be better to have one that allows >> > > (optionally) >> > > > > > >> setting >> > > > > > >>>>>>> the 4 >> > > > > > >>>>>>>> incoming serdes. The result serde is defined by the >> > > > > Materialized. >> > > > > > >> The >> > > > > > >>>>>>>> incoming serdes can be optional because they might >> already >> > > be >> > > > > > >>>>> available >> > > > > > >>>>>>> on >> > > > > > >>>>>>>> the source KTables, or the default serdes from the >> config >> > > > might >> > > > > be >> > > > > > >>>>>>>> applicable. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins >> > > don't >> > > > > > allow >> > > > > > >>>>>>> setting >> > > > > > >>>>>>>> one, and it seems like it might actually be harmful, >> since >> > > the >> > > > > > rekey >> > > > > > >>>>>>>> operation needs to produce results that are >> co-partitioned >> > > > with >> > > > > > the >> > > > > > >>>>>>> "other" >> > > > > > >>>>>>>> KTable. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I >> didn't >> > > > > actually >> > > > > > >>>>>>> follow >> > > > > > >>>>>>>> what Matthias meant about namespacing requiring >> > > > "deserializing" >> > > > > > the >> > > > > > >>>>>>> record >> > > > > > >>>>>>>> header. The headers are already Strings, so I don't >> think >> > > that >> > > > > > >>>>>>>> deserialization is required. If we applied the >> namespace >> > at >> > > > > source >> > > > > > >>>>> nodes >> > > > > > >>>>>>>> and stripped it at sink nodes, this would be >> practically >> > no >> > > > > > >> overhead. >> > > > > > >>>>>>> The >> > > > > > >>>>>>>> advantage of the namespace idea is that no public API >> > change >> > > > wrt >> > > > > > >>>>> headers >> > > > > > >>>>>>>> needs to happen, and no restrictions need to be placed >> on >> > > > users' >> > > > > > >>>>>>> headers. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> (Although I'm wondering if we can get away without the >> > > header >> > > > at >> > > > > > >>>>> all... >> > > > > > >>>>>>>> stay tuned) >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM >> table >> > > > > growing >> > > > > > >>>>>>> without >> > > > > > >>>>>>>> bound. As I read it, the HWM table is effectively >> > > implementing >> > > > > OCC >> > > > > > >> to >> > > > > > >>>>>>>> resolve the problem you noted with disordering when the >> > > rekey >> > > > is >> > > > > > >>>>>>>> reversed... particularly notable when the FK changes. >> As >> > > such, >> > > > > it >> > > > > > >>>>> only >> > > > > > >>>>>>>> needs to track the most recent "version" (the offset in >> > the >> > > > > source >> > > > > > >>>>>>>> partition) of each key. Therefore, it should have the >> same >> > > > > number >> > > > > > of >> > > > > > >>>>>>> keys >> > > > > > >>>>>>>> as the source table at all times. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> I see that you are aware of KIP-258, which I think >> might >> > be >> > > > > > relevant >> > > > > > >>>>> in >> > > > > > >>>>>>> a >> > > > > > >>>>>>>> couple of ways. One: it's just about storing the >> timestamp >> > > in >> > > > > the >> > > > > > >>>>> state >> > > > > > >>>>>>>> store, but the ultimate idea is to effectively use the >> > > > timestamp >> > > > > > as >> > > > > > >>>>> an >> > > > > > >>>>>>> OCC >> > > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't >> want to >> > > use >> > > > > the >> > > > > > >>>>>>>> timestamp for this operation, but if you were to use a >> > > similar >> > > > > > >>>>>>> mechanism to >> > > > > > >>>>>>>> store the source offset in the store alongside the >> > re-keyed >> > > > > > values, >> > > > > > >>>>> then >> > > > > > >>>>>>>> you could avoid a separate table. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 6. You and Jan have been thinking about this for a long >> > > time, >> > > > so >> > > > > > >> I've >> > > > > > >>>>>>>> probably missed something here, but I'm wondering if we >> > can >> > > > > avoid >> > > > > > >> the >> > > > > > >>>>>>> HWM >> > > > > > >>>>>>>> tracking at all and resolve out-of-order during a final >> > join >> > > > > > >>>>> instead... >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Let's say we're joining a left table (Integer K: Letter >> > FK, >> > > > > (other >> > > > > > >>>>>>> data)) >> > > > > > >>>>>>>> to a right table (Letter K: (some data)). >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Left table: >> > > > > > >>>>>>>> 1: (A, xyz) >> > > > > > >>>>>>>> 2: (B, asd) >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Right table: >> > > > > > >>>>>>>> A: EntityA >> > > > > > >>>>>>>> B: EntityB >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> We could do a rekey as you proposed with a combined >> key, >> > but >> > > > not >> > > > > > >>>>>>>> propagating the value at all.. >> > > > > > >>>>>>>> Rekey table: >> > > > > > >>>>>>>> A-1: (dummy value) >> > > > > > >>>>>>>> B-2: (dummy value) >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Which we then join with the right table to produce: >> > > > > > >>>>>>>> A-1: EntityA >> > > > > > >>>>>>>> B-2: EntityB >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Which gets rekeyed back: >> > > > > > >>>>>>>> 1: A, EntityA >> > > > > > >>>>>>>> 2: B, EntityB >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> And finally we do the actual join: >> > > > > > >>>>>>>> Result table: >> > > > > > >>>>>>>> 1: ((A, xyz), EntityA) >> > > > > > >>>>>>>> 2: ((B, asd), EntityB) >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> The thing is that in that last join, we have the >> > opportunity >> > > > to >> > > > > > >>>>> compare >> > > > > > >>>>>>> the >> > > > > > >>>>>>>> current FK in the left table with the incoming PK of >> the >> > > right >> > > > > > >>>>> table. If >> > > > > > >>>>>>>> they don't match, we just drop the event, since it >> must be >> > > > > > outdated. >> > > > > > >>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>>> In your KIP, you gave an example in which (1: A, xyz) >> gets >> > > > > updated >> > > > > > >> to >> > > > > > >>>>>>> (1: >> > > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about whether >> the >> > > > final >> > > > > > >>>>> state >> > > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the >> > algorithm >> > > > > above, >> > > > > > >>>>> you >> > > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: >> (B, >> > > xyz), >> > > > > (B, >> > > > > > >>>>>>>> EntityB)). It seems like this does give you enough >> > > information >> > > > > to >> > > > > > >>>>> make >> > > > > > >>>>>>> the >> > > > > > >>>>>>>> right choice, regardless of disordering. >> > > > > > >>>>>>> >> > > > > > >>>>>>> Will check Adams patch, but this should work. As >> mentioned >> > > > often >> > > > > I >> > > > > > am >> > > > > > >>>>>>> not convinced on partitioning back for the user >> > > automatically. >> > > > I >> > > > > > >> think >> > > > > > >>>>>>> this is the real performance eater ;) >> > > > > > >>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the >> > > > performance >> > > > > of >> > > > > > >>>>> the >> > > > > > >>>>>>>> range scans when records change in the right table. >> You've >> > > > said >> > > > > > that >> > > > > > >>>>>>> you've >> > > > > > >>>>>>>> been using the algorithm you presented in production >> for a >> > > > > while. >> > > > > > >> Can >> > > > > > >>>>>>> you >> > > > > > >>>>>>>> give us a sense of the performance characteristics >> you've >> > > > > > observed? >> > > > > > >>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> Make it work, make it fast, make it beautiful. The >> topmost >> > > > thing >> > > > > > here >> > > > > > >>>>> is >> > > > > > >>>>>>> / was correctness. In practice I do not measure the >> > > performance >> > > > > of >> > > > > > >> the >> > > > > > >>>>>>> range scan. Usual cases I run this with is emitting >> 500k - >> > > 1kk >> > > > > rows >> > > > > > >>>>>>> on a left hand side change. The range scan is just the >> work >> > > you >> > > > > > gotta >> > > > > > >>>>>>> do, also when you pack your data into different formats, >> > > > usually >> > > > > > the >> > > > > > >>>>>>> rocks performance is very tight to the size of the data >> and >> > > we >> > > > > > can't >> > > > > > >>>>>>> really change that. It is more important for users to >> > prevent >> > > > > > useless >> > > > > > >>>>>>> updates to begin with. My left hand side is guarded to >> drop >> > > > > changes >> > > > > > >>>>> that >> > > > > > >>>>>>> are not going to change my join output. >> > > > > > >>>>>>> >> > > > > > >>>>>>> usually it's: >> > > > > > >>>>>>> >> > > > > > >>>>>>> drop unused fields and then don't forward if >> > old.equals(new) >> > > > > > >>>>>>> >> > > > > > >>>>>>> regarding to the performance of creating an iterator for >> > > > smaller >> > > > > > >>>>>>> fanouts, users can still just do a group by first then >> > > anyways. >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>>> I could only think of one alternative, but I'm not >> sure if >> > > > it's >> > > > > > >>>>> better >> > > > > > >>>>>>> or >> > > > > > >>>>>>>> worse... If the first re-key only needs to preserve the >> > > > original >> > > > > > >> key, >> > > > > > >>>>>>> as I >> > > > > > >>>>>>>> proposed in #6, then we could store a vector of keys in >> > the >> > > > > value: >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Left table: >> > > > > > >>>>>>>> 1: A,... >> > > > > > >>>>>>>> 2: B,... >> > > > > > >>>>>>>> 3: A,... >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Gets re-keyed: >> > > > > > >>>>>>>> A: [1, 3] >> > > > > > >>>>>>>> B: [2] >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Then, the rhs part of the join would only need a >> regular >> > > > > > single-key >> > > > > > >>>>>>> lookup. >> > > > > > >>>>>>>> Of course we have to deal with the problem of large >> > values, >> > > as >> > > > > > >>>>> there's >> > > > > > >>>>>>> no >> > > > > > >>>>>>>> bound on the number of lhs records that can reference >> rhs >> > > > > records. >> > > > > > >>>>>>> Offhand, >> > > > > > >>>>>>>> I'd say we could page the values, so when one row is >> past >> > > the >> > > > > > >>>>>>> threshold, we >> > > > > > >>>>>>>> append the key for the next page. Then in most cases, >> it >> > > would >> > > > > be >> > > > > > a >> > > > > > >>>>>>> single >> > > > > > >>>>>>>> key lookup, but for large fan-out updates, it would be >> one >> > > per >> > > > > > (max >> > > > > > >>>>>>> value >> > > > > > >>>>>>>> size)/(avg lhs key size). >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> This seems more complex, though... Plus, I think >> there's >> > > some >> > > > > > extra >> > > > > > >>>>>>>> tracking we'd need to do to know when to emit a >> > retraction. >> > > > For >> > > > > > >>>>> example, >> > > > > > >>>>>>>> when record 1 is deleted, the re-key table would just >> have >> > > (A: >> > > > > > [3]). >> > > > > > >>>>>>> Some >> > > > > > >>>>>>>> kind of tombstone is needed so that the join result >> for 1 >> > > can >> > > > > also >> > > > > > >> be >> > > > > > >>>>>>>> retracted. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> That's all! >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Thanks so much to both Adam and Jan for the thoughtful >> > KIP. >> > > > > Sorry >> > > > > > >> the >> > > > > > >>>>>>>> discussion has been slow. >> > > > > > >>>>>>>> -John >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak < >> > > > > > >>>>> jan.filip...@trivago.com> >> > > > > > >>>>>>>> wrote: >> > > > > > >>>>>>>> >> > > > > > >>>>>>>>> Id say you can just call the vote. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> that happens all the time, and if something comes up, >> it >> > > just >> > > > > > goes >> > > > > > >>>>> back >> > > > > > >>>>>>>>> to discuss. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> would not expect to much attention with another >> another >> > > email >> > > > > in >> > > > > > >>>>> this >> > > > > > >>>>>>>>> thread. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> best Jan >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote: >> > > > > > >>>>>>>>>> Hello Contributors >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do >> need >> > to >> > > > bump >> > > > > > >>>>> this to >> > > > > > >>>>>>>>> keep >> > > > > > >>>>>>>>>> visibility up. I am still intending to push this >> through >> > > > once >> > > > > > >>>>>>> contributor >> > > > > > >>>>>>>>>> feedback is given. >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> Main points that need addressing: >> > > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current >> > > singular >> > > > > > graph >> > > > > > >>>>> node >> > > > > > >>>>>>>>> into >> > > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters right >> > > now. I >> > > > > am >> > > > > > a >> > > > > > >>>>> bit >> > > > > > >>>>>>>>> fuzzy >> > > > > > >>>>>>>>>> on how the optimizations are supposed to work, so I >> > would >> > > > > > >>>>> appreciate >> > > > > > >>>>>>> any >> > > > > > >>>>>>>>>> help on this aspect. >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This >> thread >> > > has >> > > > > > much >> > > > > > >>>>>>>>> discourse >> > > > > > >>>>>>>>>> between Jan and I between the current highwater mark >> > > > proposal >> > > > > > and >> > > > > > >> a >> > > > > > >>>>>>>>> groupBy >> > > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we need >> to >> > > > > strictly >> > > > > > >>>>> handle >> > > > > > >>>>>>>>> any >> > > > > > >>>>>>>>>> chance of out-of-order data and leave none of it up >> to >> > the >> > > > > > >>>>> consumer. >> > > > > > >>>>>>> Any >> > > > > > >>>>>>>>>> comments or suggestions here would also help. >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> 3) Anything else that you see that would prevent this >> > from >> > > > > > moving >> > > > > > >>>>> to a >> > > > > > >>>>>>>>> vote? >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> Thanks >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> Adam >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare < >> > > > > > >>>>>>>>> adam.bellem...@gmail.com> >> > > > > > >>>>>>>>>> wrote: >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>>> Hi Jan >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and >> > > > > > >>>>> Stores.persistentWindowStore, >> > > > > > >>>>>>> you >> > > > > > >>>>>>>>>>> actually only need to specify the amount of segments >> > you >> > > > want >> > > > > > and >> > > > > > >>>>> how >> > > > > > >>>>>>>>> large >> > > > > > >>>>>>>>>>> they are. To the best of my understanding, what >> happens >> > > is >> > > > > that >> > > > > > >>>>> the >> > > > > > >>>>>>>>>>> segments are automatically rolled over as new data >> with >> > > new >> > > > > > >>>>>>> timestamps >> > > > > > >>>>>>>>> are >> > > > > > >>>>>>>>>>> created. We use this exact functionality in some of >> the >> > > > work >> > > > > > done >> > > > > > >>>>>>>>>>> internally at my company. For reference, this is the >> > > > hopping >> > > > > > >>>>> windowed >> > > > > > >>>>>>>>> store. >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21 >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> In the code that I have provided, there are going >> to be >> > > two >> > > > > 24h >> > > > > > >>>>>>>>> segments. >> > > > > > >>>>>>>>>>> When a record is put into the windowStore, it will >> be >> > > > > inserted >> > > > > > at >> > > > > > >>>>>>> time >> > > > > > >>>>>>>>> T in >> > > > > > >>>>>>>>>>> both segments. The two segments will always overlap >> by >> > > 12h. >> > > > > As >> > > > > > >>>>> time >> > > > > > >>>>>>>>> goes on >> > > > > > >>>>>>>>>>> and new records are added (say at time T+12h+), the >> > > oldest >> > > > > > >> segment >> > > > > > >>>>>>> will >> > > > > > >>>>>>>>> be >> > > > > > >>>>>>>>>>> automatically deleted and a new segment created. The >> > > > records >> > > > > > are >> > > > > > >>>>> by >> > > > > > >>>>>>>>> default >> > > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that it >> is >> > > the >> > > > > > record >> > > > > > >>>>>>> time, >> > > > > > >>>>>>>>> not >> > > > > > >>>>>>>>>>> the clock time, which is used. >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> To the best of my understanding, the timestamps are >> > > > retained >> > > > > > when >> > > > > > >>>>>>>>>>> restoring from the changelog. >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL >> > at a >> > > > > > >>>>>>> segment-level, >> > > > > > >>>>>>>>>>> instead of at an individual record level. >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < >> > > > > > >>>>>>> jan.filip...@trivago.com> >> > > > > > >>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with >> > > > > > ClassCastException >> > > > > > >>>>> or >> > > > > > >>>>>>>>>>>> similar. >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> You either would have to specify the window you >> > > fetch/put >> > > > or >> > > > > > >>>>> iterate >> > > > > > >>>>>>>>>>>> across all windows the key was found in right? >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> I just hope the window-store doesn't check >> stream-time >> > > > under >> > > > > > the >> > > > > > >>>>>>> hoods >> > > > > > >>>>>>>>>>>> that would be a questionable interface. >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> If it does: did you see my comment on checking all >> the >> > > > > windows >> > > > > > >>>>>>> earlier? >> > > > > > >>>>>>>>>>>> that would be needed to actually give reasonable >> time >> > > > > > gurantees. >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> Best >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote: >> > > > > > >>>>>>>>>>>>> Hi Jan >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> Check for " highwaterMat " in the PR. I only >> changed >> > > the >> > > > > > state >> > > > > > >>>>>>> store, >> > > > > > >>>>>>>>>>>> not >> > > > > > >>>>>>>>>>>>> the ProcessorSupplier. >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> Thanks, >> > > > > > >>>>>>>>>>>>> Adam >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak < >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> @Guozhang >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed >> > something >> > > > that >> > > > > > >>>>> will be >> > > > > > >>>>>>>>>>>>>>> extremely >> > > > > > >>>>>>>>>>>>>>> useful for this KIP. >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> @Jan >> > > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being said, I >> > will >> > > > not >> > > > > > be >> > > > > > >>>>>>> moving >> > > > > > >>>>>>>>>>>> ahead >> > > > > > >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy >> > > solution >> > > > > as >> > > > > > >> you >> > > > > > >>>>>>>>>>>> propose. >> > > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it >> > yourself >> > > > off >> > > > > > of >> > > > > > >>>>> my >> > > > > > >>>>>>>>>>>> current PR >> > > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I >> would >> > > be >> > > > > more >> > > > > > >>>>> than >> > > > > > >>>>>>>>>>>> happy to >> > > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it >> > stands >> > > > > right >> > > > > > >>>>> now, >> > > > > > >>>>>>> I do >> > > > > > >>>>>>>>>>>> not >> > > > > > >>>>>>>>>>>>>>> really have more time to invest into >> alternatives >> > > > without >> > > > > > >>>>> there >> > > > > > >>>>>>>>> being >> > > > > > >>>>>>>>>>>> a >> > > > > > >>>>>>>>>>>>>>> strong indication from the binding voters which >> > they >> > > > > would >> > > > > > >>>>>>> prefer. >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally gave >> up >> > on >> > > > the >> > > > > > >>>>> streams >> > > > > > >>>>>>>>> DSL >> > > > > > >>>>>>>>>>>> for >> > > > > > >>>>>>>>>>>>>> some time already, otherwise I would have pulled >> > this >> > > > KIP >> > > > > > >>>>> through >> > > > > > >>>>>>>>>>>> already. >> > > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL based on >> > > PAPI. >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the >> windowed >> > > > state >> > > > > > >>>>> store >> > > > > > >>>>>>> in >> > > > > > >>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>> next >> > > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and then I >> > will >> > > > come >> > > > > > >> back >> > > > > > >>>>>>> for >> > > > > > >>>>>>>>>>>> final >> > > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of >> > the >> > > > > > binding >> > > > > > >>>>>>> voters >> > > > > > >>>>>>>>>>>> could >> > > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have >> updated >> > it >> > > > > > >>>>> according >> > > > > > >>>>>>> to >> > > > > > >>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>> latest plan: >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >> > > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed >> > > store. >> > > > > > This >> > > > > > >>>>>>> could >> > > > > > >>>>>>>>> be >> > > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they >> > are >> > > > > > >>>>> completed. >> > > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> Thanks, >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> Adam >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier >> > already >> > > > > > updated >> > > > > > >>>>> in >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>>>>> PR? >> > > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing >> > > > > something? >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang < >> > > > > > >>>>>>> wangg...@gmail.com> >> > > > > > >>>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is >> the >> > > > wrong >> > > > > > >> link, >> > > > > > >>>>>>> as it >> > > > > > >>>>>>>>>>>> is >> > > > > > >>>>>>>>>>>>>>>> for >> > > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as >> part of >> > > > > KIP-258 >> > > > > > >>>>> we do >> > > > > > >>>>>>>>>>>> want to >> > > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source >> > KTable" >> > > > such >> > > > > > >> that >> > > > > > >>>>>>>>>>>> instead of >> > > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the materialized >> > store, >> > > > > i.e. >> > > > > > >>>>>>> following >> > > > > > >>>>>>>>>>>>>>>> offset >> > > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are older >> > than >> > > > the >> > > > > > >>>>> current >> > > > > > >>>>>>>>>>>> key's >> > > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering. >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> Guozhang >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang >> Wang < >> > > > > > >>>>>>>>> wangg...@gmail.com> >> > > > > > >>>>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> Hello Adam, >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the >> final >> > > step >> > > > > > (i.e. >> > > > > > >>>>> the >> > > > > > >>>>>>>>> high >> > > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced >> with >> > a >> > > > > window >> > > > > > >>>>>>> store), >> > > > > > >>>>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>> think >> > > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually >> help: >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > > > >>>>>>>>>>>>>>>>> >> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a >> key-value >> > > > store >> > > > > > >>>>> (i.e. >> > > > > > >>>>>>> only >> > > > > > >>>>>>>>>>>> for >> > > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its >> usage, >> > as >> > > > > > >>>>> described >> > > > > > >>>>>>> in >> > > > > > >>>>>>>>>>>>>>>>> >> https://issues.apache.org/jira/browse/KAFKA-5533 >> > , >> > > is >> > > > > > that >> > > > > > >>>>> we >> > > > > > >>>>>>> can >> > > > > > >>>>>>>>>>>> then >> > > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if its >> > > > > timestamp >> > > > > > is >> > > > > > >>>>>>>>> smaller >> > > > > > >>>>>>>>>>>> than >> > > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I >> > think >> > > it >> > > > > is >> > > > > > >>>>> very >> > > > > > >>>>>>>>>>>> similar to >> > > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark based >> > > > > filtering, >> > > > > > >>>>> while >> > > > > > >>>>>>>>> you >> > > > > > >>>>>>>>>>>> only >> > > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the >> > > joining >> > > > > > >> records >> > > > > > >>>>>>> are >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> correctly >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the >> final >> > > > stage. >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and >> > hence >> > > > > > >>>>>>> non-windowed >> > > > > > >>>>>>>>>>>> KTables >> > > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not >> really >> > > have >> > > > a >> > > > > > good >> > > > > > >>>>>>>>> support >> > > > > > >>>>>>>>>>>> for >> > > > > > >>>>>>>>>>>>>>>>> their joins anyways ( >> > > > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107) >> > > > > > >>>>>>>>>>>>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed >> > > KTable-KTable >> > > > > > >>>>> non-key >> > > > > > >>>>>>>>> joins >> > > > > > >>>>>>>>>>>> for >> > > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help. >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> Guozhang >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak >> < >> > > > > > >>>>>>>>>>>> jan.filip...@trivago.com >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote: >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation would >> > grow >> > > > > > >> endlessly >> > > > > > >>>>>>> based >> > > > > > >>>>>>>>>>>> on >> > > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair >> of >> > > > (<this >> > > > > > >>>>> table >> > > > > > >>>>>>>>>>>> primary >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> key>, >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is >> used >> > > to >> > > > > > >>>>>>> differentiate >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> between >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest >> proposal >> > > > would >> > > > > > be >> > > > > > >>>>> to >> > > > > > >>>>>>>>>>>> replace >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> it >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. >> This >> > > would >> > > > > > allow >> > > > > > >>>>> the >> > > > > > >>>>>>>>> same >> > > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. >> This >> > > > > should >> > > > > > >>>>> allow >> > > > > > >>>>>>> for >> > > > > > >>>>>>>>>>>> all >> > > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and >> > should >> > > be >> > > > > > >>>>>>> customizable >> > > > > > >>>>>>>>>>>> by >> > > > > > >>>>>>>>>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: >> perhaps >> > > just >> > > > > 10 >> > > > > > >>>>>>> minutes >> > > > > > >>>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> window, >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...). >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do >> the >> > > > trick >> > > > > > >> here. >> > > > > > >>>>>>> Even >> > > > > > >>>>>>>>>>>> if I >> > > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic >> > > repartitioning >> > > > > > >>>>> optional >> > > > > > >>>>>>>>>>>> since I >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> would >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I >> am a >> > > > little >> > > > > > bit >> > > > > > >>>>>>>>>>>> sceptical >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> about >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially one >> > > could >> > > > > run >> > > > > > >>>>> into >> > > > > > >>>>>>>>>>>> problems >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> when >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window >> border. I >> > > will >> > > > > > check >> > > > > > >>>>> you >> > > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its >> problematic, we >> > > > could >> > > > > > >>>>> still >> > > > > > >>>>>>>>> check >> > > > > > >>>>>>>>>>>>>>>>>> _all_ >> > > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad performance >> > > impact I >> > > > > > >> guess. >> > > > > > >>>>>>> Will >> > > > > > >>>>>>>>>>>> let >> > > > > > >>>>>>>>>>>>>>>>>> you >> > > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be correct >> as >> > > is. I >> > > > > > >>>>> wouldn't >> > > > > > >>>>>>> not >> > > > > > >>>>>>>>>>>> like >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> to >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => >> > > timestamp(A) < >> > > > > > >>>>>>>>> timestamp(B). >> > > > > > >>>>>>>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> think >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> we can't expect that. >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> @Jan >> > > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now - >> > thanks >> > > > for >> > > > > > the >> > > > > > >>>>>>>>>>>> diagram, it >> > > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do >> not >> > > have >> > > > > the >> > > > > > >>>>>>> original >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> primary >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was >> > > available >> > > > > > then >> > > > > > >>>>> you >> > > > > > >>>>>>>>>>>> would be >> > > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the Map. >> > That >> > > > > being >> > > > > > >>>>> said, >> > > > > > >>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> encourage >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for >> > > clarity >> > > > > for >> > > > > > >>>>>>> everyone >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> else. >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really >> hard >> > > > work. >> > > > > > But >> > > > > > >>>>> I >> > > > > > >>>>>>>>>>>> understand >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the >> > > original >> > > > > > >> primary >> > > > > > >>>>>>> key, >> > > > > > >>>>>>>>> We >> > > > > > >>>>>>>>>>>>>>>>>> have >> > > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI >> > and >> > > > > > >> basically >> > > > > > >>>>>>> not >> > > > > > >>>>>>>>>>>> using >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> any >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed >> > that >> > > in >> > > > > > >>>>> original >> > > > > > >>>>>>> DSL >> > > > > > >>>>>>>>>>>> its >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> not >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess >> up on >> > > my >> > > > > end. >> > > > > > >>>>> Will >> > > > > > >>>>>>>>>>>> finish >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this >> week. >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the >> Map >> > > stay >> > > > > > >> inside >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>>>>> State >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes >> > have >> > > > > > >>>>> propagated? >> > > > > > >>>>>>>>> Isn't >> > > > > > >>>>>>>>>>>>>>>>>>> this >> > > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark >> state >> > > > store? >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, >> substractor >> > is >> > > > > gonna >> > > > > > >>>>>>> return >> > > > > > >>>>>>>>>>>> `null` >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But >> there >> > is >> > > > > going >> > > > > > to >> > > > > > >>>>> be >> > > > > > >>>>>>> a >> > > > > > >>>>>>>>>>>> store >> > > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this >> > store >> > > > > > directly >> > > > > > >>>>> for >> > > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a >> > > > regular >> > > > > > >>>>> store, >> > > > > > >>>>>>>>>>>> satisfying >> > > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / >> join. >> > > The >> > > > > > >>>>> Windowed >> > > > > > >>>>>>>>>>>> store is >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> not >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next statefull >> > > > operation >> > > > > > we >> > > > > > >>>>>>> would >> > > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we >> have >> > the >> > > > > > window >> > > > > > >>>>>>> store >> > > > > > >>>>>>>>>>>> also >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> have >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> the values then. >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom >> > group >> > > > by >> > > > > > >>>>> before >> > > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary key i >> > think >> > > > it >> > > > > > >> would >> > > > > > >>>>>>> help >> > > > > > >>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> users >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given the >> > > > original >> > > > > > >>>>> primary >> > > > > > >>>>>>>>> key >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> issue I >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid >> foundation >> > > to >> > > > > > build >> > > > > > >>>>> on. >> > > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user. >> > very >> > > > > > >>>>>>> unfortunate. I >> > > > > > >>>>>>>>>>>> could >> > > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I do >> not >> > > > think >> > > > > > its >> > > > > > >>>>> a >> > > > > > >>>>>>> good >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> decision. >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> Thanks >> > > > > > >>>>>>>>>>>>>>>>>>> Adam >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta >> > > Dumbre < >> > > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto: >> > > > > > >>>>>>> dumbreprajakta...@gmail.com >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> please remove me from this group >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:29 PM >> Jan >> > > > > Filipiak >> > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com >> <mailto: >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > Hi Adam, >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > give me some time, will make >> such a >> > > > > chart. >> > > > > > >> last >> > > > > > >>>>>>> time i >> > > > > > >>>>>>>>>>>> didn't >> > > > > > >>>>>>>>>>>>>>>>>>> get along >> > > > > > >>>>>>>>>>>>>>>>>>> > well with giphy and ruined all >> your >> > > > > charts. >> > > > > > >>>>>>>>>>>>>>>>>>> > Hopefully i can get it done >> today >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > On 08.09.2018 16:00, Adam >> Bellemare >> > > > > wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > > Hi Jan >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > I have included a diagram of >> > what I >> > > > > > >> attempted >> > > > > > >>>>> on >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>>>>> KIP. >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>> >> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining >> > > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate >> > > > > > >>>>>>>>>>>>>>>>>>> < >> > > > > > >>>>>>>>>>>> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>> >> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin >> > > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > I attempted this back at the >> > start >> > > of >> > > > > my >> > > > > > own >> > > > > > >>>>>>>>>>>> implementation >> > > > > > >>>>>>>>>>>>>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>>>>> this >> > > > > > >>>>>>>>>>>>>>>>>>> > > solution, and since I could >> not >> > get >> > > > it >> > > > > to >> > > > > > >>>>> work I >> > > > > > >>>>>>> have >> > > > > > >>>>>>>>>>>> since >> > > > > > >>>>>>>>>>>>>>>>>>> discarded the >> > > > > > >>>>>>>>>>>>>>>>>>> > > code. At this point in time, >> if >> > you >> > > > > wish >> > > > > > to >> > > > > > >>>>>>> continue >> > > > > > >>>>>>>>>>>> pursuing >> > > > > > >>>>>>>>>>>>>>>>>>> for your >> > > > > > >>>>>>>>>>>>>>>>>>> > > groupBy solution, I ask that >> you >> > > > please >> > > > > > >>>>> create a >> > > > > > >>>>>>>>>>>> diagram on >> > > > > > >>>>>>>>>>>>>>>>>>> the KIP >> > > > > > >>>>>>>>>>>>>>>>>>> > > carefully explaining your >> > solution. >> > > > > > Please >> > > > > > >>>>> feel >> > > > > > >>>>>>> free >> > > > > > >>>>>>>>> to >> > > > > > >>>>>>>>>>>> use >> > > > > > >>>>>>>>>>>>>>>>>>> the image I >> > > > > > >>>>>>>>>>>>>>>>>>> > > just posted as a starting >> point. >> > I >> > > am >> > > > > > having >> > > > > > >>>>>>> trouble >> > > > > > >>>>>>>>>>>>>>>>>>> understanding your >> > > > > > >>>>>>>>>>>>>>>>>>> > > explanations but I think that >> a >> > > > > carefully >> > > > > > >>>>>>> constructed >> > > > > > >>>>>>>>>>>> diagram >> > > > > > >>>>>>>>>>>>>>>>>>> will clear >> > > > > > >>>>>>>>>>>>>>>>>>> > up >> > > > > > >>>>>>>>>>>>>>>>>>> > > any misunderstandings. >> > Alternately, >> > > > > > please >> > > > > > >>>>> post a >> > > > > > >>>>>>>>>>>>>>>>>>> comprehensive PR with >> > > > > > >>>>>>>>>>>>>>>>>>> > > your solution. I can only >> guess >> > at >> > > > what >> > > > > > you >> > > > > > >>>>>>> mean, and >> > > > > > >>>>>>>>>>>> since I >> > > > > > >>>>>>>>>>>>>>>>>>> value my >> > > > > > >>>>>>>>>>>>>>>>>>> > own >> > > > > > >>>>>>>>>>>>>>>>>>> > > time as much as you value >> yours, >> > I >> > > > > > believe >> > > > > > >> it >> > > > > > >>>>> is >> > > > > > >>>>>>> your >> > > > > > >>>>>>>>>>>>>>>>>>> responsibility to >> > > > > > >>>>>>>>>>>>>>>>>>> > > provide an implementation >> instead >> > > of >> > > > me >> > > > > > >>>>> trying to >> > > > > > >>>>>>>>> guess. >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > Adam >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > > On Sat, Sep 8, 2018 at 8:00 >> AM, >> > Jan >> > > > > > Filipiak >> > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com >> <mailto: >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > > wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Hi James, >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> nice to see you beeing >> > interested. >> > > > > kafka >> > > > > > >>>>>>> streams at >> > > > > > >>>>>>>>>>>> this >> > > > > > >>>>>>>>>>>>>>>>>>> point supports >> > > > > > >>>>>>>>>>>>>>>>>>> > >> all sorts of joins as long as >> > both >> > > > > > streams >> > > > > > >>>>> have >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>>>>> same >> > > > > > >>>>>>>>>>>>>>>>>>> key. >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Adam is currently >> implementing a >> > > > join >> > > > > > >> where a >> > > > > > >>>>>>> KTable >> > > > > > >>>>>>>>>>>> and a >> > > > > > >>>>>>>>>>>>>>>>>>> KTable can >> > > > > > >>>>>>>>>>>>>>>>>>> > have >> > > > > > >>>>>>>>>>>>>>>>>>> > >> a one to many relation ship >> > (1:n). >> > > > We >> > > > > > >> exploit >> > > > > > >>>>>>> that >> > > > > > >>>>>>>>>>>> rocksdb >> > > > > > >>>>>>>>>>>>>>>>>>> is >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> a >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > >> datastore that keeps data >> sorted >> > (At >> > > > > least >> > > > > > >>>>>>> exposes an >> > > > > > >>>>>>>>>>>> API to >> > > > > > >>>>>>>>>>>>>>>>>>> access the >> > > > > > >>>>>>>>>>>>>>>>>>> > >> stored data in a sorted >> > fashion). >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> I think the technical caveats >> > are >> > > > well >> > > > > > >>>>>>> understood >> > > > > > >>>>>>>>> now >> > > > > > >>>>>>>>>>>> and we >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> are >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > basically >> > > > > > >>>>>>>>>>>>>>>>>>> > >> down to philosophy and API >> > Design >> > > ( >> > > > > when >> > > > > > >> Adam >> > > > > > >>>>>>> sees >> > > > > > >>>>>>>>> my >> > > > > > >>>>>>>>>>>> newest >> > > > > > >>>>>>>>>>>>>>>>>>> message). >> > > > > > >>>>>>>>>>>>>>>>>>> > >> I have a lengthy track >> record of >> > > > > loosing >> > > > > > >>>>> those >> > > > > > >>>>>>> kinda >> > > > > > >>>>>>>>>>>>>>>>>>> arguments within >> > > > > > >>>>>>>>>>>>>>>>>>> > the >> > > > > > >>>>>>>>>>>>>>>>>>> > >> streams community and I have >> no >> > > clue >> > > > > > why. >> > > > > > >> So >> > > > > > >>>>> I >> > > > > > >>>>>>>>>>>> literally >> > > > > > >>>>>>>>>>>>>>>>>>> can't wait for >> > > > > > >>>>>>>>>>>>>>>>>>> > you >> > > > > > >>>>>>>>>>>>>>>>>>> > >> to churn through this thread >> and >> > > > give >> > > > > > you >> > > > > > >>>>>>> opinion on >> > > > > > >>>>>>>>>>>> how we >> > > > > > >>>>>>>>>>>>>>>>>>> should >> > > > > > >>>>>>>>>>>>>>>>>>> > design >> > > > > > >>>>>>>>>>>>>>>>>>> > >> the return type of the >> > > oneToManyJoin >> > > > > and >> > > > > > >> how >> > > > > > >>>>>>> many >> > > > > > >>>>>>>>>>>> power we >> > > > > > >>>>>>>>>>>>>>>>>>> want to give >> > > > > > >>>>>>>>>>>>>>>>>>> > to >> > > > > > >>>>>>>>>>>>>>>>>>> > >> the user vs "simplicity" >> (where >> > > > > > simplicity >> > > > > > >>>>> isn't >> > > > > > >>>>>>>>>>>> really that >> > > > > > >>>>>>>>>>>>>>>>>>> as users >> > > > > > >>>>>>>>>>>>>>>>>>> > still >> > > > > > >>>>>>>>>>>>>>>>>>> > >> need to understand it I >> argue) >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> waiting for you to join in on >> > the >> > > > > > >> discussion >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> Best Jan >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> On 07.09.2018 15:49, James >> Kwan >> > > > wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> I am new to this group and I >> > > found >> > > > > this >> > > > > > >>>>> subject >> > > > > > >>>>>>>>>>>>>>>>>>> interesting. Sounds >> > > > > > >>>>>>>>>>>>>>>>>>> > like >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> you guys want to implement a >> > join >> > > > > > table of >> > > > > > >>>>> two >> > > > > > >>>>>>>>>>>> streams? Is >> > > > > > >>>>>>>>>>>>>>>>>>> there >> > > > > > >>>>>>>>>>>>>>>>>>> > somewhere >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> I can see the original >> > > requirement >> > > > or >> > > > > > >>>>> proposal? >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:13 AM, >> Jan >> > > > > > Filipiak >> > > > > > >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com >> <mailto: >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> On 05.09.2018 22:17, Adam >> > > > Bellemare >> > > > > > >> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> I'm currently testing >> using a >> > > > > > Windowed >> > > > > > >>>>> Store >> > > > > > >>>>>>> to >> > > > > > >>>>>>>>>>>> store the >> > > > > > >>>>>>>>>>>>>>>>>>> highwater >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> mark. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> By all indications this >> > should >> > > > work >> > > > > > >> fine, >> > > > > > >>>>>>> with >> > > > > > >>>>>>>>> the >> > > > > > >>>>>>>>>>>> caveat >> > > > > > >>>>>>>>>>>>>>>>>>> being that >> > > > > > >>>>>>>>>>>>>>>>>>> > it >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> can >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> only resolve out-of-order >> > > arrival >> > > > > > for up >> > > > > > >>>>> to >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>>>>> size of >> > > > > > >>>>>>>>>>>>>>>>>>> the window >> > > > > > >>>>>>>>>>>>>>>>>>> > (ie: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> 24h, 72h, etc). This would >> > > remove >> > > > > the >> > > > > > >>>>>>> possibility >> > > > > > >>>>>>>>>>>> of it >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> being >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > unbounded >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> in >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> size. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> With regards to Jan's >> > > > suggestion, I >> > > > > > >>>>> believe >> > > > > > >>>>>>> this >> > > > > > >>>>>>>>> is >> > > > > > >>>>>>>>>>>> where >> > > > > > >>>>>>>>>>>>>>>>>>> we will >> > > > > > >>>>>>>>>>>>>>>>>>> > have >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> to >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> remain in disagreement. >> > While I >> > > > do >> > > > > > not >> > > > > > >>>>>>> disagree >> > > > > > >>>>>>>>>>>> with your >> > > > > > >>>>>>>>>>>>>>>>>>> statement >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> about >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> there likely to be >> additional >> > > > joins >> > > > > > done >> > > > > > >>>>> in a >> > > > > > >>>>>>>>>>>> real-world >> > > > > > >>>>>>>>>>>>>>>>>>> workflow, I >> > > > > > >>>>>>>>>>>>>>>>>>> > do >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> not >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> see how you can >> conclusively >> > > deal >> > > > > > with >> > > > > > >>>>>>>>> out-of-order >> > > > > > >>>>>>>>>>>>>>>>>>> arrival >> > > > > > >>>>>>>>>>>>>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> foreign-key >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> changes and subsequent >> > joins. I >> > > > > have >> > > > > > >>>>>>> attempted >> > > > > > >>>>>>>>> what >> > > > > > >>>>>>>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>>>> think you have >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> proposed (without a >> > high-water, >> > > > > using >> > > > > > >>>>>>> groupBy and >> > > > > > >>>>>>>>>>>> reduce) >> > > > > > >>>>>>>>>>>>>>>>>>> and found >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> that if >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> the foreign key changes >> too >> > > > > quickly, >> > > > > > or >> > > > > > >>>>> the >> > > > > > >>>>>>> load >> > > > > > >>>>>>>>> on >> > > > > > >>>>>>>>>>>> a >> > > > > > >>>>>>>>>>>>>>>>>>> stream thread >> > > > > > >>>>>>>>>>>>>>>>>>> > is >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> too >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> high, the joined messages >> > will >> > > > > arrive >> > > > > > >>>>>>>>> out-of-order >> > > > > > >>>>>>>>>>>> and be >> > > > > > >>>>>>>>>>>>>>>>>>> incorrectly >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> propagated, such that an >> > > > > intermediate >> > > > > > >>>>> event >> > > > > > >>>>>>> is >> > > > > > >>>>>>>>>>>>>>>>>>> represented >> > > > > > >>>>>>>>>>>>>>>>>>> as the >> > > > > > >>>>>>>>>>>>>>>>>>> > final >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> event. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Can you shed some light on >> > your >> > > > > > groupBy >> > > > > > >>>>>>>>>>>> implementation. >> > > > > > >>>>>>>>>>>>>>>>>>> There must be >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> some sort of flaw in it. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> I have a suspicion where it >> > is, >> > > I >> > > > > > would >> > > > > > >>>>> just >> > > > > > >>>>>>> like >> > > > > > >>>>>>>>> to >> > > > > > >>>>>>>>>>>>>>>>>>> confirm. The idea >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> is bullet proof and it >> must be >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> an implementation mess up. >> I >> > > would >> > > > > > like >> > > > > > >> to >> > > > > > >>>>>>> clarify >> > > > > > >>>>>>>>>>>> before >> > > > > > >>>>>>>>>>>>>>>>>>> we draw a >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> conclusion. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Repartitioning the >> > scattered >> > > > > events >> > > > > > >>>>> back to >> > > > > > >>>>>>>>> their >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> original >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > >>>>> partitions is the only way I >> > know >> > > > how >> > > > > > to >> > > > > > >>>>>>>>> conclusively >> > > > > > >>>>>>>>>>>> deal >> > > > > > >>>>>>>>>>>>>>>>>>> with >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> out-of-order events in a >> > given >> > > > time >> > > > > > >> frame, >> > > > > > >>>>>>> and to >> > > > > > >>>>>>>>>>>> ensure >> > > > > > >>>>>>>>>>>>>>>>>>> that the >> > > > > > >>>>>>>>>>>>>>>>>>> > data >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> is >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> eventually consistent with >> > the >> > > > > input >> > > > > > >>>>> events. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> If you have some code to >> > share >> > > > that >> > > > > > >>>>>>> illustrates >> > > > > > >>>>>>>>> your >> > > > > > >>>>>>>>>>>>>>>>>>> approach, I >> > > > > > >>>>>>>>>>>>>>>>>>> > would >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> be >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> very grateful as it would >> > > remove >> > > > > any >> > > > > > >>>>>>>>>>>> misunderstandings >> > > > > > >>>>>>>>>>>>>>>>>>> that I may >> > > > > > >>>>>>>>>>>>>>>>>>> > have. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> ah okay you were looking >> for >> > my >> > > > > code. >> > > > > > I >> > > > > > >>>>> don't >> > > > > > >>>>>>> have >> > > > > > >>>>>>>>>>>>>>>>>>> something easily >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> readable here as its >> bloated >> > > with >> > > > > > >>>>> OO-patterns. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> its anyhow trivial: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> @Override >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> public T apply(K >> aggKey, >> > V >> > > > > > value, T >> > > > > > >>>>>>>>> aggregate) >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Map<U, V> >> > > > > currentStateAsMap = >> > > > > > >>>>>>>>>>>> asMap(aggregate); >> > > > > > >>>>>>>>>>>>>>>>>>> << >> > > > > > >>>>>>>>>>>>>>>>>>> imaginary >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> U toModifyKey = >> > > > > > >>>>> mapper.apply(value); >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << this is the >> > > place >> > > > > > where >> > > > > > >>>>> people >> > > > > > >>>>>>>>>>>> actually >> > > > > > >>>>>>>>>>>>>>>>>>> gonna have >> > > > > > >>>>>>>>>>>>>>>>>>> > issues >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> and why you probably >> couldn't >> > do >> > > > it. >> > > > > > we >> > > > > > >>>>> would >> > > > > > >>>>>>> need >> > > > > > >>>>>>>>>>>> to find >> > > > > > >>>>>>>>>>>>>>>>>>> a solution >> > > > > > >>>>>>>>>>>>>>>>>>> > here. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> I didn't realize that yet. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << we >> propagate >> > the >> > > > > > field in >> > > > > > >>>>> the >> > > > > > >>>>>>>>>>>> joiner, so >> > > > > > >>>>>>>>>>>>>>>>>>> that we can >> > > > > > >>>>>>>>>>>>>>>>>>> > pick >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> it up in an aggregate. >> > Probably >> > > > you >> > > > > > have >> > > > > > >>>>> not >> > > > > > >>>>>>>>> thought >> > > > > > >>>>>>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>>>>> this in your >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> approach right? >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << I am very >> open >> > > to >> > > > > > find a >> > > > > > >>>>>>> generic >> > > > > > >>>>>>>>>>>> solution >> > > > > > >>>>>>>>>>>>>>>>>>> here. In my >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> honest opinion this is >> broken >> > in >> > > > > > >>>>>>>>> KTableImpl.GroupBy >> > > > > > >>>>>>>>>>>> that >> > > > > > >>>>>>>>>>>>>>>>>>> it >> > > > > > >>>>>>>>>>>>>>>>>>> looses >> > > > > > >>>>>>>>>>>>>>>>>>> > the keys >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> and only maintains the >> > aggregate >> > > > > key. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << I >> abstracted >> > it >> > > > away >> > > > > > back >> > > > > > >>>>>>> then way >> > > > > > >>>>>>>>>>>> before >> > > > > > >>>>>>>>>>>>>>>>>>> i >> > > > > > >>>>>>>>>>>>>>>>>>> was >> > > > > > >>>>>>>>>>>>>>>>>>> > thinking >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> of oneToMany join. That is >> > why I >> > > > > > didn't >> > > > > > >>>>>>> realize >> > > > > > >>>>>>>>> its >> > > > > > >>>>>>>>>>>>>>>>>>> significance here. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> << Opinions? >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> for (V m : >> current) >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > currentStateAsMap.put(mapper.apply(m), >> > > > > > >> m); >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> if (isAdder) >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > currentStateAsMap.put(toModifyKey, >> > > > > > >> value); >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> else >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> { >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > currentStateAsMap.remove(toModifyKey); >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > if(currentStateAsMap.isEmpty()){ >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> return >> null; >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> retrun >> > > > > > >>>>>>> asAggregateType(currentStateAsMap) >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> } >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>> Thanks, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> Adam >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 at >> 3:35 >> > PM, >> > > > Jan >> > > > > > >>>>> Filipiak >> > > > > > >>>>>>> < >> > > > > > >>>>>>>>>>>>>>>>>>> > jan.filip...@trivago.com >> <mailto: >> > > > > > >>>>>>>>> jan.filip...@trivago.com >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>> Thanks Adam for bringing >> > > Matthias >> > > > > to >> > > > > > >>>>> speed! >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> about the differences. I >> > think >> > > > > > >> re-keying >> > > > > > >>>>>>> back >> > > > > > >>>>>>>>>>>> should be >> > > > > > >>>>>>>>>>>>>>>>>>> optional at >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> best. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I would say we return a >> > > > > > KScatteredTable >> > > > > > >>>>> with >> > > > > > >>>>>>>>>>>> reshuffle() >> > > > > > >>>>>>>>>>>>>>>>>>> returning >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> KTable<originalKey,Joined> >> > to >> > > > make >> > > > > > the >> > > > > > >>>>>>> backwards >> > > > > > >>>>>>>>>>>>>>>>>>> repartitioning >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> optional. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I am also in a big >> favour of >> > > > doing >> > > > > > the >> > > > > > >>>>> out >> > > > > > >>>>>>> of >> > > > > > >>>>>>>>> order >> > > > > > >>>>>>>>>>>>>>>>>>> processing using >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> group >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> by instead high water >> mark >> > > > > tracking. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> Just because unbounded >> > growth >> > > is >> > > > > > just >> > > > > > >>>>> scary >> > > > > > >>>>>>> + It >> > > > > > >>>>>>>>>>>> saves >> > > > > > >>>>>>>>>>>>>>>>>>> us >> > > > > > >>>>>>>>>>>>>>>>>>> the header >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> stuff. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> I think the abstraction >> of >> > > > always >> > > > > > >>>>>>> repartitioning >> > > > > > >>>>>>>>>>>> back is >> > > > > > >>>>>>>>>>>>>>>>>>> just not so >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> strong. Like the work has >> > been >> > > > > done >> > > > > > >>>>> before >> > > > > > >>>>>>> we >> > > > > > >>>>>>>>>>>> partition >> > > > > > >>>>>>>>>>>>>>>>>>> back and >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> grouping >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> by something else >> afterwards >> > > is >> > > > > > really >> > > > > > >>>>>>> common. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> On 05.09.2018 13:49, Adam >> > > > > Bellemare >> > > > > > >>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>> Hi Matthias >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Thank you for your >> > feedback, >> > > I >> > > > do >> > > > > > >>>>>>> appreciate >> > > > > > >>>>>>>>> it! >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> While name spacing >> would be >> > > > > > possible, >> > > > > > >> it >> > > > > > >>>>>>> would >> > > > > > >>>>>>>>>>>> require >> > > > > > >>>>>>>>>>>>>>>>>>> to >> > > > > > >>>>>>>>>>>>>>>>>>> > deserialize >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what >> implies >> > a >> > > > > > runtime >> > > > > > >>>>>>> overhead. >> > > > > > >>>>>>>>> I >> > > > > > >>>>>>>>>>>> would >> > > > > > >>>>>>>>>>>>>>>>>>> suggest to >> > > > > > >>>>>>>>>>>>>>>>>>> > no >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to >> avoid >> > > the >> > > > > > >>>>> overhead. >> > > > > > >>>>>>> If >> > > > > > >>>>>>>>> this >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > problem in >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can >> still >> > add >> > > > > name >> > > > > > >>>>> spacing >> > > > > > >>>>>>>>> later >> > > > > > >>>>>>>>>>>> on. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Agreed. I will go with >> > > using a >> > > > > > >> reserved >> > > > > > >>>>>>> string >> > > > > > >>>>>>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>>>>> document it. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> My main concern about >> the >> > > > design >> > > > > it >> > > > > > >> the >> > > > > > >>>>>>> type of >> > > > > > >>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> result KTable: >> > > > > > >>>>>>>>>>>>>>>>>>> > If >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> understood the proposal >> > > > > correctly, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> In your example, you >> have >> > > > table1 >> > > > > > and >> > > > > > >>>>> table2 >> > > > > > >>>>>>>>>>>> swapped. >> > > > > > >>>>>>>>>>>>>>>>>>> Here is how it >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> works >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> currently: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 1) table1 has the >> records >> > > that >> > > > > > contain >> > > > > > >>>>> the >> > > > > > >>>>>>>>>>>> foreign key >> > > > > > >>>>>>>>>>>>>>>>>>> within their >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> value. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 input stream: >> > > > > > <a,(fk=A,bar=1)>, >> > > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> <c,(fk=B,bar=3)> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table2 input stream: >> <A,X>, >> > > > <B,Y> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 2) A Value mapper is >> > required >> > > > to >> > > > > > >> extract >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>>>>> foreign >> > > > > > >>>>>>>>>>>>>>>>>>> key. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 foreign key >> mapper: >> > ( >> > > > > value >> > > > > > => >> > > > > > >>>>>>> value.fk >> > > > > > >>>>>>>>>>>>>>>>>>> <http://value.fk> ) >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> The mapper is applied to >> > each >> > > > > > element >> > > > > > >> in >> > > > > > >>>>>>>>> table1, >> > > > > > >>>>>>>>>>>> and a >> > > > > > >>>>>>>>>>>>>>>>>>> new combined >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> key is >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> made: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 mapped: <A-a, >> > > > > (fk=A,bar=1)>, >> > > > > > >>>>> <A-b, >> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>, >> > > > > > >>>>>>>>>>>>>>>>>>> <B-c, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> (fk=B,bar=3)> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 3) The rekeyed events >> are >> > > > > > >> copartitioned >> > > > > > >>>>>>> with >> > > > > > >>>>>>>>>>>> table2: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> a) Stream Thread with >> > > Partition >> > > > > 0: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: >> <A-a, >> > > > > > >>>>> (fk=A,bar=1)>, >> > > > > > >>>>>>> <A-b, >> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <A,X> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> b) Stream Thread with >> > > Partition >> > > > > 1: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: >> <B-c, >> > > > > > >> (fk=B,bar=3)> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <B,Y> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> 4) From here, they can >> be >> > > > joined >> > > > > > >>>>> together >> > > > > > >>>>>>>>> locally >> > > > > > >>>>>>>>>>>> by >> > > > > > >>>>>>>>>>>>>>>>>>> applying the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> joiner >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> function. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> At this point, Jan's >> design >> > > and >> > > > > my >> > > > > > >>>>> design >> > > > > > >>>>>>>>>>>> deviate. My >> > > > > > >>>>>>>>>>>>>>>>>>> design goes >> > > > > > >>>>>>>>>>>>>>>>>>> > on >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> to >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> repartition the data >> > > post-join >> > > > > and >> > > > > > >>>>> resolve >> > > > > > >>>>>>>>>>>> out-of-order >> > > > > > >>>>>>>>>>>>>>>>>>> arrival of >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> records, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> finally returning the >> data >> > > > keyed >> > > > > > just >> > > > > > >>>>> the >> > > > > > >>>>>>>>>>>> original key. >> > > > > > >>>>>>>>>>>>>>>>>>> I do not >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> expose >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> CombinedKey or any of >> the >> > > > > internals >> > > > > > >>>>>>> outside of >> > > > > > >>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> joinOnForeignKey >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> function. This does make >> > for >> > > > > larger >> > > > > > >>>>>>> footprint, >> > > > > > >>>>>>>>>>>> but it >> > > > > > >>>>>>>>>>>>>>>>>>> removes all >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> agency >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> for resolving >> out-of-order >> > > > > arrivals >> > > > > > >> and >> > > > > > >>>>>>>>> handling >> > > > > > >>>>>>>>>>>>>>>>>>> CombinedKeys from >> > > > > > >>>>>>>>>>>>>>>>>>> > the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> user. I believe that >> this >> > > makes >> > > > > the >> > > > > > >>>>>>> function >> > > > > > >>>>>>>>> much >> > > > > > >>>>>>>>>>>>>>>>>>> easier >> > > > > > >>>>>>>>>>>>>>>>>>> to use. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Let me know if this >> helps >> > > > resolve >> > > > > > your >> > > > > > >>>>>>>>> questions, >> > > > > > >>>>>>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>>>>> please feel >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> free to >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> add anything else on >> your >> > > mind. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Thanks again, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Adam >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 2018 at >> 8:36 >> > > PM, >> > > > > > >>>>> Matthias J. >> > > > > > >>>>>>>>> Sax < >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> matth...@confluent.io >> > > <mailto: >> > > > > > >>>>>>>>>>>> matth...@confluent.io>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> Hi, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> I am just catching up >> on >> > > this >> > > > > > >> thread. I >> > > > > > >>>>>>> did >> > > > > > >>>>>>>>> not >> > > > > > >>>>>>>>>>>> read >> > > > > > >>>>>>>>>>>>>>>>>>> everything so >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> far, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> but want to share >> couple >> > of >> > > > > > initial >> > > > > > >>>>>>> thoughts: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Headers: I think there >> is >> > a >> > > > > > >> fundamental >> > > > > > >>>>>>>>>>>> difference >> > > > > > >>>>>>>>>>>>>>>>>>> between header >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> usage >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> in this KIP and KP-258. >> > For >> > > > 258, >> > > > > > we >> > > > > > >> add >> > > > > > >>>>>>>>> headers >> > > > > > >>>>>>>>>>>> to >> > > > > > >>>>>>>>>>>>>>>>>>> changelog topic >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> that >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> are owned by Kafka >> Streams >> > > and >> > > > > > nobody >> > > > > > >>>>>>> else is >> > > > > > >>>>>>>>>>>> supposed >> > > > > > >>>>>>>>>>>>>>>>>>> to write >> > > > > > >>>>>>>>>>>>>>>>>>> > into >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> them. In fact, no user >> > > header >> > > > > are >> > > > > > >>>>> written >> > > > > > >>>>>>> into >> > > > > > >>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> changelog topic >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> thus, there are not >> > > conflicts. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Nevertheless, I don't >> see >> > a >> > > > big >> > > > > > issue >> > > > > > >>>>> with >> > > > > > >>>>>>>>> using >> > > > > > >>>>>>>>>>>>>>>>>>> headers within >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Streams. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> As long as we document >> it, >> > > we >> > > > > can >> > > > > > >> have >> > > > > > >>>>>>> some >> > > > > > >>>>>>>>>>>> "reserved" >> > > > > > >>>>>>>>>>>>>>>>>>> header keys >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> users are not allowed >> to >> > use >> > > > > when >> > > > > > >>>>>>> processing >> > > > > > >>>>>>>>>>>> data with >> > > > > > >>>>>>>>>>>>>>>>>>> Kafka >> > > > > > >>>>>>>>>>>>>>>>>>> > Streams. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this should be >> ok. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> I think there is a safe >> > way >> > > to >> > > > > > avoid >> > > > > > >>>>>>>>> conflicts, >> > > > > > >>>>>>>>>>>> since >> > > > > > >>>>>>>>>>>>>>>>>>> these >> > > > > > >>>>>>>>>>>>>>>>>>> > headers >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> are >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> only needed in >> internal >> > > > topics >> > > > > (I >> > > > > > >>>>> think): >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> For internal and >> > changelog >> > > > > > topics, >> > > > > > >> we >> > > > > > >>>>> can >> > > > > > >>>>>>>>>>>> namespace >> > > > > > >>>>>>>>>>>>>>>>>>> all headers: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * user-defined headers >> > are >> > > > > > >> namespaced >> > > > > > >>>>> as >> > > > > > >>>>>>>>>>>> "external." >> > > > > > >>>>>>>>>>>>>>>>>>> + >> > > > > > >>>>>>>>>>>>>>>>>>> headerKey >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * internal headers are >> > > > > > namespaced as >> > > > > > >>>>>>>>>>>> "internal." + >> > > > > > >>>>>>>>>>>>>>>>>>> headerKey >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> While name spacing >> would >> > be >> > > > > > >> possible, >> > > > > > >>>>> it >> > > > > > >>>>>>>>> would >> > > > > > >>>>>>>>>>>>>>>>>>> require >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> to >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > >>>>>>>> deserialize >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what >> implies >> > a >> > > > > > runtime >> > > > > > >>>>>>> overhead. >> > > > > > >>>>>>>>> I >> > > > > > >>>>>>>>>>>> would >> > > > > > >>>>>>>>>>>>>>>>>>> suggest to >> > > > > > >>>>>>>>>>>>>>>>>>> > no >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to >> avoid >> > > the >> > > > > > >>>>> overhead. >> > > > > > >>>>>>> If >> > > > > > >>>>>>>>> this >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > problem in >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can >> still >> > add >> > > > > name >> > > > > > >>>>> spacing >> > > > > > >>>>>>>>> later >> > > > > > >>>>>>>>>>>> on. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> My main concern about >> the >> > > > design >> > > > > > it >> > > > > > >> the >> > > > > > >>>>>>> type >> > > > > > >>>>>>>>> of >> > > > > > >>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> result KTable: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> If I >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> understood the proposal >> > > > > correctly, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V1> table1 = >> ... >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K2,V2> table2 = >> ... >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V3> >> joinedTable >> > = >> > > > > > >>>>>>>>>>>> table1.join(table2,...); >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> implies that the >> > > `joinedTable` >> > > > > has >> > > > > > >> the >> > > > > > >>>>>>> same >> > > > > > >>>>>>>>> key >> > > > > > >>>>>>>>>>>> as the >> > > > > > >>>>>>>>>>>>>>>>>>> left input >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this does not >> work >> > > > because >> > > > > > if >> > > > > > >>>>> table2 >> > > > > > >>>>>>>>>>>> contains >> > > > > > >>>>>>>>>>>>>>>>>>> multiple rows >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> that >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> join with a record in >> > table1 >> > > > > > (what is >> > > > > > >>>>> the >> > > > > > >>>>>>> main >> > > > > > >>>>>>>>>>>> purpose >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> a >> > > > > > >>>>>>>>>>>>>>>>>>> > foreign >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> key >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> join), the result table >> > > would >> > > > > only >> > > > > > >>>>>>> contain a >> > > > > > >>>>>>>>>>>> single >> > > > > > >>>>>>>>>>>>>>>>>>> join result, >> > > > > > >>>>>>>>>>>>>>>>>>> > but >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> not >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> multiple. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Example: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table1 input stream: >> <A,X> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> table2 input stream: >> > > > <a,(A,1)>, >> > > > > > >>>>> <b,(A,2)> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> We use table2 value a >> > > foreign >> > > > > key >> > > > > > to >> > > > > > >>>>>>> table1 >> > > > > > >>>>>>>>> key >> > > > > > >>>>>>>>>>>> (ie, >> > > > > > >>>>>>>>>>>>>>>>>>> "A" joins). >> > > > > > >>>>>>>>>>>>>>>>>>> > If >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> result key is the same >> key >> > > as >> > > > > key >> > > > > > of >> > > > > > >>>>>>> table1, >> > > > > > >>>>>>>>> this >> > > > > > >>>>>>>>>>>>>>>>>>> implies that the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> result can either be >> <A, >> > > > > > join(X,1)> >> > > > > > >> or >> > > > > > >>>>> <A, >> > > > > > >>>>>>>>>>>> join(X,2)> >> > > > > > >>>>>>>>>>>>>>>>>>> but not >> > > > > > >>>>>>>>>>>>>>>>>>> > both. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Because the share the >> same >> > > > key, >> > > > > > >>>>> whatever >> > > > > > >>>>>>>>> result >> > > > > > >>>>>>>>>>>> record >> > > > > > >>>>>>>>>>>>>>>>>>> we emit >> > > > > > >>>>>>>>>>>>>>>>>>> > later, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> overwrite the previous >> > > result. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> This is the reason why >> Jan >> > > > > > originally >> > > > > > >>>>>>> proposed >> > > > > > >>>>>>>>>>>> to use >> > > > > > >>>>>>>>>>>>>>>>>>> a >> > > > > > >>>>>>>>>>>>>>>>>>> > combination >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> both primary keys of >> the >> > > input >> > > > > > tables >> > > > > > >>>>> as >> > > > > > >>>>>>> key >> > > > > > >>>>>>>>> of >> > > > > > >>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> output table. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> This >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> makes the keys of the >> > output >> > > > > table >> > > > > > >>>>> unique >> > > > > > >>>>>>> and >> > > > > > >>>>>>>>> we >> > > > > > >>>>>>>>>>>> can >> > > > > > >>>>>>>>>>>>>>>>>>> store both in >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> output table: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Result would be <A-a, >> > > > > join(X,1)>, >> > > > > > >> <A-b, >> > > > > > >>>>>>>>>>>> join(X,2)> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Thoughts? >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> -Matthias >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan >> > > > Filipiak >> > > > > > >> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>> Just on remark here. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> The high-watermark >> could >> > be >> > > > > > >>>>> disregarded. >> > > > > > >>>>>>> The >> > > > > > >>>>>>>>>>>> decision >> > > > > > >>>>>>>>>>>>>>>>>>> about the >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> forward >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> depends on the size of >> > the >> > > > > > >> aggregated >> > > > > > >>>>>>> map. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> Only 1 element long >> maps >> > > > would >> > > > > be >> > > > > > >>>>>>> unpacked >> > > > > > >>>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>>>>> forwarded. 0 >> > > > > > >>>>>>>>>>>>>>>>>>> > element >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> maps >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> would be published as >> > > delete. >> > > > > Any >> > > > > > >>>>> other >> > > > > > >>>>>>> count >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of map entries is in >> > > "waiting >> > > > > for >> > > > > > >>>>> correct >> > > > > > >>>>>>>>>>>> deletes to >> > > > > > >>>>>>>>>>>>>>>>>>> > arrive"-state. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 21:29, >> Adam >> > > > > > Bellemare >> > > > > > >>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> It does look like I >> could >> > > > > replace >> > > > > > >> the >> > > > > > >>>>>>> second >> > > > > > >>>>>>>>>>>>>>>>>>> repartition store >> > > > > > >>>>>>>>>>>>>>>>>>> > and >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> highwater store with >> a >> > > > groupBy >> > > > > > and >> > > > > > >>>>>>> reduce. >> > > > > > >>>>>>>>>>>> However, >> > > > > > >>>>>>>>>>>>>>>>>>> it looks >> > > > > > >>>>>>>>>>>>>>>>>>> > like >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> I >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> would >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> still need to store >> the >> > > > > > highwater >> > > > > > >>>>> value >> > > > > > >>>>>>>>> within >> > > > > > >>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> materialized >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> store, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> to >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> compare the arrival of >> > > > > > out-of-order >> > > > > > >>>>>>> records >> > > > > > >>>>>>>>>>>> (assuming >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> my >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> > >>>>>>>>> understanding >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> THIS is correct...). >> This >> > > in >> > > > > > effect >> > > > > > >> is >> > > > > > >>>>>>> the >> > > > > > >>>>>>>>> same >> > > > > > >>>>>>>>>>>> as >> > > > > > >>>>>>>>>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>>>>>> design I >> > > > > > >>>>>>>>>>>>>>>>>>> > have >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> now, >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> just with the two >> tables >> > > > merged >> > > > > > >>>>> together. >> > > > > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> -- >> > > > > > >>>>>>>>>>>>>>>>> -- Guozhang >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> -- >> > > > > > >>>>>>>>>>>>>>>> -- Guozhang >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >>>> >> > > > > > >>> >> > > > > > >> >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> > -- -- Guozhang