If they don't find the time: They usually take the opposite path from me :D so the answer would be clear.
hence my suggestion to vote. On 04.12.2018 21:06, Adam Bellemare wrote: > Hi Guozhang and Matthias > > I know both of you are quite busy, but we've gotten this KIP to a point > where we need more guidance on the API (perhaps a bit of a tie-breaker, if > you will). If you have anyone else you may think should look at this, > please tag them accordingly. > > The scenario is as such: > > Current Option: > API: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces > 1) Rekey the data to CombinedKey, and shuffles it to the partition with the > foreignKey (repartition 1) > 2) Join the data > 3) Shuffle the data back to the original node (repartition 2) > 4) Resolve out-of-order arrival / race condition due to foreign-key changes. > > Alternate Option: > Perform #1 and #2 above, and return a KScatteredTable. > - It would be keyed on a wrapped key function: <CombinedKey<KO, K>, VR> (KO > = Other Table Key, K = This Table Key, VR = Joined Result) > - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user > would be able to perform additional functions directly from the > KScatteredTable (TBD - currently out of scope). > - John's analysis 2-emails up is accurate as to the tradeoffs. > > Current Option is coded as-is. Alternate option is possible, but will > require for implementation details to be made in the API and some exposure > of new data structures into the API (ie: CombinedKey). > > I appreciate any insight into this. > > Thanks. > > On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Hi John >> >> Thanks for your feedback and assistance. I think your summary is accurate >> from my perspective. Additionally, I would like to add that there is a risk >> of inconsistent final states without performing the resolution. This is a >> major concern for me as most of the data I have dealt with is produced by >> relational databases. We have seen a number of cases where a user in the >> Rails UI has modified the field (foreign key), realized they made a >> mistake, and then updated the field again with a new key. The events are >> propagated out as they are produced, and as such we have had real-world >> cases where these inconsistencies were propagated downstream as the final >> values due to the race conditions in the fanout of the data. >> >> This solution that I propose values correctness of the final result over >> other factors. >> >> We could always move this function over to using a KScatteredTable >> implementation in the future, and simply deprecate it this join API in >> time. I think I would like to hear more from some of the other major >> committers on which course of action they would think is best before any >> more coding is done. >> >> Thanks again >> >> Adam >> >> >> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <j...@confluent.io> wrote: >> >>> Hi Jan and Adam, >>> >>> Wow, thanks for doing that test, Adam. Those results are encouraging. >>> >>> Thanks for your performance experience as well, Jan. I agree that avoiding >>> unnecessary join outputs is especially important when the fan-out is so >>> high. I suppose this could also be built into the implementation we're >>> discussing, but it wouldn't have to be specified in the KIP (since it's an >>> API-transparent optimization). >>> >>> As far as whether or not to re-repartition the data, I didn't bring it up >>> because it sounded like the two of you agreed to leave the KIP as-is, >>> despite the disagreement. >>> >>> If you want my opinion, I feel like both approaches are reasonable. >>> It sounds like Jan values more the potential for developers to optimize >>> their topologies to re-use the intermediate nodes, whereas Adam places >>> more >>> value on having a single operator that people can use without extra steps >>> at the end. >>> >>> Personally, although I do find it exceptionally annoying when a framework >>> gets in my way when I'm trying to optimize something, it seems better to >>> go >>> for a single operation. >>> * Encapsulating the internal transitions gives us significant latitude in >>> the implementation (for example, joining only at the end, not in the >>> middle >>> to avoid extra data copying and out-of-order resolution; how we represent >>> the first repartition keys (combined keys vs. value vectors), etc.). If we >>> publish something like a KScatteredTable with the right-partitioned joined >>> data, then the API pretty much locks in the implementation as well. >>> * The API seems simpler to understand and use. I do mean "seems"; if >>> anyone >>> wants to make the case that KScatteredTable is actually simpler, I think >>> hypothetical usage code would help. From a relational algebra perspective, >>> it seems like KTable.join(KTable) should produce a new KTable in all >>> cases. >>> * That said, there might still be room in the API for a different >>> operation >>> like what Jan has proposed to scatter a KTable, and then do things like >>> join, re-group, etc from there... I'm not sure; I haven't thought through >>> all the consequences yet. >>> >>> This is all just my opinion after thinking over the discussion so far... >>> -John >>> >>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <adam.bellem...@gmail.com> >>> wrote: >>> >>>> Updated the PR to take into account John's feedback. >>>> >>>> I did some preliminary testing for the performance of the prefixScan. I >>>> have attached the file, but I will also include the text in the body >>> here >>>> for archival purposes (I am not sure what happens to attached files). I >>>> also updated the PR and the KIP accordingly. >>>> >>>> Summary: It scales exceptionally well for scanning large values of >>>> records. As Jan mentioned previously, the real issue would be more >>> around >>>> processing the resulting records after obtaining them. For instance, it >>>> takes approximately ~80-120 mS to flush the buffer and a further >>> ~35-85mS >>>> to scan 27.5M records, obtaining matches for 2.5M of them. Iterating >>>> through the records just to generate a simple count takes ~ 40 times >>> longer >>>> than the flush + scan combined. >>>> >>>> >>> ============================================================================================ >>>> Setup: >>>> >>>> >>> ============================================================================================ >>>> Java 9 with default settings aside from a 512 MB heap (Xmx512m, Xms512m) >>>> CPU: i7 2.2 Ghz. >>>> >>>> Note: I am using a slightly-modified, directly-accessible Kafka Streams >>>> RocksDB >>>> implementation (RocksDB.java, basically just avoiding the >>>> ProcessorContext). >>>> There are no modifications to the default RocksDB values provided in the >>>> 2.1/trunk release. >>>> >>>> >>>> keysize = 128 bytes >>>> valsize = 512 bytes >>>> >>>> Step 1: >>>> Write X positive matching events: (key = prefix + left-padded >>>> auto-incrementing integer) >>>> Step 2: >>>> Write 10X negative matching events (key = left-padded auto-incrementing >>>> integer) >>>> Step 3: >>>> Perform flush >>>> Step 4: >>>> Perform prefixScan >>>> Step 5: >>>> Iterate through return Iterator and validate the count of expected >>> events. >>>> >>>> >>>> >>> ============================================================================================ >>>> Results: >>>> >>>> >>> ============================================================================================ >>>> X = 1k (11k events total) >>>> Flush Time = 39 mS >>>> Scan Time = 7 mS >>>> 6.9 MB disk >>>> >>>> >>> -------------------------------------------------------------------------------------------- >>>> X = 10k (110k events total) >>>> Flush Time = 45 mS >>>> Scan Time = 8 mS >>>> 127 MB >>>> >>>> >>> -------------------------------------------------------------------------------------------- >>>> X = 100k (1.1M events total) >>>> Test1: >>>> Flush Time = 60 mS >>>> Scan Time = 12 mS >>>> 678 MB >>>> >>>> Test2: >>>> Flush Time = 45 mS >>>> Scan Time = 7 mS >>>> 576 MB >>>> >>>> >>> -------------------------------------------------------------------------------------------- >>>> X = 1MB (11M events total) >>>> Test1: >>>> Flush Time = 52 mS >>>> Scan Time = 19 mS >>>> 7.2 GB >>>> >>>> Test2: >>>> Flush Time = 84 mS >>>> Scan Time = 34 mS >>>> 9.1 GB >>>> >>>> >>> -------------------------------------------------------------------------------------------- >>>> X = 2.5M (27.5M events total) >>>> Test1: >>>> Flush Time = 82 mS >>>> Scan Time = 63 mS >>>> 17GB - 276 sst files >>>> >>>> Test2: >>>> Flush Time = 116 mS >>>> Scan Time = 35 mS >>>> 23GB - 361 sst files >>>> >>>> Test3: >>>> Flush Time = 103 mS >>>> Scan Time = 82 mS >>>> 19 GB - 300 sst files >>>> >>>> >>> -------------------------------------------------------------------------------------------- >>>> >>>> I had to limit my testing on my laptop to X = 2.5M events. I tried to go >>>> to X = 10M (110M events) but RocksDB was going into the 100GB+ range >>> and my >>>> laptop ran out of disk. More extensive testing could be done but I >>> suspect >>>> that it would be in line with what we're seeing in the results above. >>>> >>>> >>>> >>>> >>>> >>>> >>>> At this point in time, I think the only major discussion point is really >>>> around what Jan and I have disagreed on: repartitioning back + resolving >>>> potential out of order issues or leaving that up to the client to >>> handle. >>>> >>>> >>>> Thanks folks, >>>> >>>> Adam >>>> >>>> >>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <jan.filip...@trivago.com> >>>> wrote: >>>> >>>>> >>>>> >>>>> On 29.11.2018 15:14, John Roesler wrote: >>>>>> Hi all, >>>>>> >>>>>> Sorry that this discussion petered out... I think the 2.1 release >>>>> caused an >>>>>> extended distraction that pushed it off everyone's radar (which was >>>>>> precisely Adam's concern). Personally, I've also had some extend >>>>>> distractions of my own that kept (and continue to keep) me >>> preoccupied. >>>>>> >>>>>> However, calling for a vote did wake me up, so I guess Jan was on the >>>>> right >>>>>> track! >>>>>> >>>>>> I've gone back and reviewed the whole KIP document and the prior >>>>>> discussion, and I'd like to offer a few thoughts: >>>>>> >>>>>> API Thoughts: >>>>>> >>>>>> 1. If I read the KIP right, you are proposing a many-to-one join. >>> Could >>>>> we >>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip the design >>>>> around >>>>>> and make it a oneToManyJoin? >>>>>> >>>>>> The proposed name "joinOnForeignKey" disguises the join type, and it >>>>> seems >>>>>> like it might trick some people into using it for a one-to-one join. >>>>> This >>>>>> would work, of course, but it would be super inefficient compared to >>> a >>>>>> simple rekey-and-join. >>>>>> >>>>>> 2. I might have missed it, but I don't think it's specified whether >>>>> it's an >>>>>> inner, outer, or left join. I'm guessing an outer join, as >>> (neglecting >>>>> IQ), >>>>>> the rest can be achieved by filtering or by handling it in the >>>>> ValueJoiner. >>>>>> >>>>>> 3. The arg list to joinOnForeignKey doesn't look quite right. >>>>>> 3a. Regarding Serialized: There are a few different paradigms in >>> play in >>>>>> the Streams API, so it's confusing, but instead of three Serialized >>>>> args, I >>>>>> think it would be better to have one that allows (optionally) setting >>>>> the 4 >>>>>> incoming serdes. The result serde is defined by the Materialized. The >>>>>> incoming serdes can be optional because they might already be >>> available >>>>> on >>>>>> the source KTables, or the default serdes from the config might be >>>>>> applicable. >>>>>> >>>>>> 3b. Is the StreamPartitioner necessary? The other joins don't allow >>>>> setting >>>>>> one, and it seems like it might actually be harmful, since the rekey >>>>>> operation needs to produce results that are co-partitioned with the >>>>> "other" >>>>>> KTable. >>>>>> >>>>>> 4. I'm fine with the "reserved word" header, but I didn't actually >>>>> follow >>>>>> what Matthias meant about namespacing requiring "deserializing" the >>>>> record >>>>>> header. The headers are already Strings, so I don't think that >>>>>> deserialization is required. If we applied the namespace at source >>> nodes >>>>>> and stripped it at sink nodes, this would be practically no overhead. >>>>> The >>>>>> advantage of the namespace idea is that no public API change wrt >>> headers >>>>>> needs to happen, and no restrictions need to be placed on users' >>>>> headers. >>>>>> >>>>>> (Although I'm wondering if we can get away without the header at >>> all... >>>>>> stay tuned) >>>>>> >>>>>> 5. I also didn't follow the discussion about the HWM table growing >>>>> without >>>>>> bound. As I read it, the HWM table is effectively implementing OCC to >>>>>> resolve the problem you noted with disordering when the rekey is >>>>>> reversed... particularly notable when the FK changes. As such, it >>> only >>>>>> needs to track the most recent "version" (the offset in the source >>>>>> partition) of each key. Therefore, it should have the same number of >>>>> keys >>>>>> as the source table at all times. >>>>>> >>>>>> I see that you are aware of KIP-258, which I think might be relevant >>> in >>>>> a >>>>>> couple of ways. One: it's just about storing the timestamp in the >>> state >>>>>> store, but the ultimate idea is to effectively use the timestamp as >>> an >>>>> OCC >>>>>> "version" to drop disordered updates. You wouldn't want to use the >>>>>> timestamp for this operation, but if you were to use a similar >>>>> mechanism to >>>>>> store the source offset in the store alongside the re-keyed values, >>> then >>>>>> you could avoid a separate table. >>>>>> >>>>>> 6. You and Jan have been thinking about this for a long time, so I've >>>>>> probably missed something here, but I'm wondering if we can avoid the >>>>> HWM >>>>>> tracking at all and resolve out-of-order during a final join >>> instead... >>>>>> >>>>>> Let's say we're joining a left table (Integer K: Letter FK, (other >>>>> data)) >>>>>> to a right table (Letter K: (some data)). >>>>>> >>>>>> Left table: >>>>>> 1: (A, xyz) >>>>>> 2: (B, asd) >>>>>> >>>>>> Right table: >>>>>> A: EntityA >>>>>> B: EntityB >>>>>> >>>>>> We could do a rekey as you proposed with a combined key, but not >>>>>> propagating the value at all.. >>>>>> Rekey table: >>>>>> A-1: (dummy value) >>>>>> B-2: (dummy value) >>>>>> >>>>>> Which we then join with the right table to produce: >>>>>> A-1: EntityA >>>>>> B-2: EntityB >>>>>> >>>>>> Which gets rekeyed back: >>>>>> 1: A, EntityA >>>>>> 2: B, EntityB >>>>>> >>>>>> And finally we do the actual join: >>>>>> Result table: >>>>>> 1: ((A, xyz), EntityA) >>>>>> 2: ((B, asd), EntityB) >>>>>> >>>>>> The thing is that in that last join, we have the opportunity to >>> compare >>>>> the >>>>>> current FK in the left table with the incoming PK of the right >>> table. If >>>>>> they don't match, we just drop the event, since it must be outdated. >>>>>> >>>>> >>>>>> In your KIP, you gave an example in which (1: A, xyz) gets updated to >>>>> (1: >>>>>> B, xyz), ultimately yielding a conundrum about whether the final >>> state >>>>>> should be (1: null) or (1: joined-on-B). With the algorithm above, >>> you >>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B, >>>>>> EntityB)). It seems like this does give you enough information to >>> make >>>>> the >>>>>> right choice, regardless of disordering. >>>>> >>>>> Will check Adams patch, but this should work. As mentioned often I am >>>>> not convinced on partitioning back for the user automatically. I think >>>>> this is the real performance eater ;) >>>>> >>>>>> >>>>>> >>>>>> 7. Last thought... I'm a little concerned about the performance of >>> the >>>>>> range scans when records change in the right table. You've said that >>>>> you've >>>>>> been using the algorithm you presented in production for a while. Can >>>>> you >>>>>> give us a sense of the performance characteristics you've observed? >>>>>> >>>>> >>>>> Make it work, make it fast, make it beautiful. The topmost thing here >>> is >>>>> / was correctness. In practice I do not measure the performance of the >>>>> range scan. Usual cases I run this with is emitting 500k - 1kk rows >>>>> on a left hand side change. The range scan is just the work you gotta >>>>> do, also when you pack your data into different formats, usually the >>>>> rocks performance is very tight to the size of the data and we can't >>>>> really change that. It is more important for users to prevent useless >>>>> updates to begin with. My left hand side is guarded to drop changes >>> that >>>>> are not going to change my join output. >>>>> >>>>> usually it's: >>>>> >>>>> drop unused fields and then don't forward if old.equals(new) >>>>> >>>>> regarding to the performance of creating an iterator for smaller >>>>> fanouts, users can still just do a group by first then anyways. >>>>> >>>>> >>>>> >>>>>> I could only think of one alternative, but I'm not sure if it's >>> better >>>>> or >>>>>> worse... If the first re-key only needs to preserve the original key, >>>>> as I >>>>>> proposed in #6, then we could store a vector of keys in the value: >>>>>> >>>>>> Left table: >>>>>> 1: A,... >>>>>> 2: B,... >>>>>> 3: A,... >>>>>> >>>>>> Gets re-keyed: >>>>>> A: [1, 3] >>>>>> B: [2] >>>>>> >>>>>> Then, the rhs part of the join would only need a regular single-key >>>>> lookup. >>>>>> Of course we have to deal with the problem of large values, as >>> there's >>>>> no >>>>>> bound on the number of lhs records that can reference rhs records. >>>>> Offhand, >>>>>> I'd say we could page the values, so when one row is past the >>>>> threshold, we >>>>>> append the key for the next page. Then in most cases, it would be a >>>>> single >>>>>> key lookup, but for large fan-out updates, it would be one per (max >>>>> value >>>>>> size)/(avg lhs key size). >>>>>> >>>>>> This seems more complex, though... Plus, I think there's some extra >>>>>> tracking we'd need to do to know when to emit a retraction. For >>> example, >>>>>> when record 1 is deleted, the re-key table would just have (A: [3]). >>>>> Some >>>>>> kind of tombstone is needed so that the join result for 1 can also be >>>>>> retracted. >>>>>> >>>>>> That's all! >>>>>> >>>>>> Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry the >>>>>> discussion has been slow. >>>>>> -John >>>>>> >>>>>> >>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak < >>> jan.filip...@trivago.com> >>>>>> wrote: >>>>>> >>>>>>> Id say you can just call the vote. >>>>>>> >>>>>>> that happens all the time, and if something comes up, it just goes >>> back >>>>>>> to discuss. >>>>>>> >>>>>>> would not expect to much attention with another another email in >>> this >>>>>>> thread. >>>>>>> >>>>>>> best Jan >>>>>>> >>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote: >>>>>>>> Hello Contributors >>>>>>>> >>>>>>>> I know that 2.1 is about to be released, but I do need to bump >>> this to >>>>>>> keep >>>>>>>> visibility up. I am still intending to push this through once >>>>> contributor >>>>>>>> feedback is given. >>>>>>>> >>>>>>>> Main points that need addressing: >>>>>>>> 1) Any way (or benefit) in structuring the current singular graph >>> node >>>>>>> into >>>>>>>> multiple nodes? It has a whopping 25 parameters right now. I am a >>> bit >>>>>>> fuzzy >>>>>>>> on how the optimizations are supposed to work, so I would >>> appreciate >>>>> any >>>>>>>> help on this aspect. >>>>>>>> >>>>>>>> 2) Overall strategy for joining + resolving. This thread has much >>>>>>> discourse >>>>>>>> between Jan and I between the current highwater mark proposal and a >>>>>>> groupBy >>>>>>>> + reduce proposal. I am of the opinion that we need to strictly >>> handle >>>>>>> any >>>>>>>> chance of out-of-order data and leave none of it up to the >>> consumer. >>>>> Any >>>>>>>> comments or suggestions here would also help. >>>>>>>> >>>>>>>> 3) Anything else that you see that would prevent this from moving >>> to a >>>>>>> vote? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> Adam >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare < >>>>>>> adam.bellem...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Jan >>>>>>>>> >>>>>>>>> With the Stores.windowStoreBuilder and >>> Stores.persistentWindowStore, >>>>> you >>>>>>>>> actually only need to specify the amount of segments you want and >>> how >>>>>>> large >>>>>>>>> they are. To the best of my understanding, what happens is that >>> the >>>>>>>>> segments are automatically rolled over as new data with new >>>>> timestamps >>>>>>> are >>>>>>>>> created. We use this exact functionality in some of the work done >>>>>>>>> internally at my company. For reference, this is the hopping >>> windowed >>>>>>> store. >>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21 >>>>>>>>> >>>>>>>>> In the code that I have provided, there are going to be two 24h >>>>>>> segments. >>>>>>>>> When a record is put into the windowStore, it will be inserted at >>>>> time >>>>>>> T in >>>>>>>>> both segments. The two segments will always overlap by 12h. As >>> time >>>>>>> goes on >>>>>>>>> and new records are added (say at time T+12h+), the oldest segment >>>>> will >>>>>>> be >>>>>>>>> automatically deleted and a new segment created. The records are >>> by >>>>>>> default >>>>>>>>> inserted with the context.timestamp(), such that it is the record >>>>> time, >>>>>>> not >>>>>>>>> the clock time, which is used. >>>>>>>>> >>>>>>>>> To the best of my understanding, the timestamps are retained when >>>>>>>>> restoring from the changelog. >>>>>>>>> >>>>>>>>> Basically, this is heavy-handed way to deal with TTL at a >>>>> segment-level, >>>>>>>>> instead of at an individual record level. >>>>>>>>> >>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < >>>>> jan.filip...@trivago.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Will that work? I expected it to blow up with ClassCastException >>> or >>>>>>>>>> similar. >>>>>>>>>> >>>>>>>>>> You either would have to specify the window you fetch/put or >>> iterate >>>>>>>>>> across all windows the key was found in right? >>>>>>>>>> >>>>>>>>>> I just hope the window-store doesn't check stream-time under the >>>>> hoods >>>>>>>>>> that would be a questionable interface. >>>>>>>>>> >>>>>>>>>> If it does: did you see my comment on checking all the windows >>>>> earlier? >>>>>>>>>> that would be needed to actually give reasonable time gurantees. >>>>>>>>>> >>>>>>>>>> Best >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote: >>>>>>>>>>> Hi Jan >>>>>>>>>>> >>>>>>>>>>> Check for " highwaterMat " in the PR. I only changed the state >>>>> store, >>>>>>>>>> not >>>>>>>>>>> the ProcessorSupplier. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Adam >>>>>>>>>>> >>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak < >>>>>>> jan.filip...@trivago.com >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: >>>>>>>>>>>> >>>>>>>>>>>>> @Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the information. This is indeed something that >>> will be >>>>>>>>>>>>> extremely >>>>>>>>>>>>> useful for this KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> @Jan >>>>>>>>>>>>> Thanks for your explanations. That being said, I will not be >>>>> moving >>>>>>>>>> ahead >>>>>>>>>>>>> with an implementation using reshuffle/groupBy solution as you >>>>>>>>>> propose. >>>>>>>>>>>>> That being said, if you wish to implement it yourself off of >>> my >>>>>>>>>> current PR >>>>>>>>>>>>> and submit it as a competitive alternative, I would be more >>> than >>>>>>>>>> happy to >>>>>>>>>>>>> help vet that as an alternate solution. As it stands right >>> now, >>>>> I do >>>>>>>>>> not >>>>>>>>>>>>> really have more time to invest into alternatives without >>> there >>>>>>> being >>>>>>>>>> a >>>>>>>>>>>>> strong indication from the binding voters which they would >>>>> prefer. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> Hey, total no worries. I think I personally gave up on the >>> streams >>>>>>> DSL >>>>>>>>>> for >>>>>>>>>>>> some time already, otherwise I would have pulled this KIP >>> through >>>>>>>>>> already. >>>>>>>>>>>> I am currently reimplementing my own DSL based on PAPI. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> I will look at finishing up my PR with the windowed state >>> store >>>>> in >>>>>>> the >>>>>>>>>>>>> next >>>>>>>>>>>>> week or so, exercising it via tests, and then I will come back >>>>> for >>>>>>>>>> final >>>>>>>>>>>>> discussions. In the meantime, I hope that any of the binding >>>>> voters >>>>>>>>>> could >>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated it >>> according >>>>> to >>>>>>> the >>>>>>>>>>>>> latest plan: >>>>>>>>>>>>> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >>>>>>>>>>>>> Support+non-key+joining+in+KTable >>>>>>>>>>>>> >>>>>>>>>>>>> I have also updated the KIP PR to use a windowed store. This >>>>> could >>>>>>> be >>>>>>>>>>>>> replaced by the results of KIP-258 whenever they are >>> completed. >>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Adam >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated >>> in >>>>> the >>>>>>>>>> PR? >>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing something? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang < >>>>> wangg...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong link, >>>>> as it >>>>>>>>>> is >>>>>>>>>>>>>> for >>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258 >>> we do >>>>>>>>>> want to >>>>>>>>>>>>>> have "handling out-of-order data for source KTable" such that >>>>>>>>>> instead of >>>>>>>>>>>>>> blindly apply the updates to the materialized store, i.e. >>>>> following >>>>>>>>>>>>>> offset >>>>>>>>>>>>>> ordering, we will reject updates that are older than the >>> current >>>>>>>>>> key's >>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang < >>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Adam, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e. >>> the >>>>>>> high >>>>>>>>>>>>>>> watermark store, now altered to be replaced with a window >>>>> store), >>>>>>> I >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>> another current on-going KIP may actually help: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is for adding the timestamp into a key-value store >>> (i.e. >>>>> only >>>>>>>>>> for >>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, as >>> described >>>>> in >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that >>> we >>>>> can >>>>>>>>>> then >>>>>>>>>>>>>>> "reject" updates from the source topics if its timestamp is >>>>>>> smaller >>>>>>>>>> than >>>>>>>>>>>>>>> the current key's latest update timestamp. I think it is >>> very >>>>>>>>>> similar to >>>>>>>>>>>>>>> what you have in mind for high watermark based filtering, >>> while >>>>>>> you >>>>>>>>>> only >>>>>>>>>>>>>>> need to make sure that the timestamps of the joining records >>>>> are >>>>>>>>>>>>>>> >>>>>>>>>>>>>> correctly >>>>>>>>>>>>>> >>>>>>>>>>>>>>> inherited though the whole topology to the final stage. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Note that this KIP is for key-value store and hence >>>>> non-windowed >>>>>>>>>> KTables >>>>>>>>>>>>>>> only, but for windowed KTables we do not really have a good >>>>>>> support >>>>>>>>>> for >>>>>>>>>>>>>>> their joins anyways ( >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107) >>>>>>>>>>>>>>> I >>>>>>>>>>>>>>> think we can just consider non-windowed KTable-KTable >>> non-key >>>>>>> joins >>>>>>>>>> for >>>>>>>>>>>>>>> now. In which case, KIP-258 should help. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak < >>>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Current highwater mark implementation would grow endlessly >>>>> based >>>>>>>>>> on >>>>>>>>>>>>>>>>> primary key of original event. It is a pair of (<this >>> table >>>>>>>>>> primary >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> key>, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> <highest offset seen for that key>). This is used to >>>>> differentiate >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> between >>>>>>>>>>>>>> >>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal would be >>> to >>>>>>>>>> replace >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> it >>>>>>>>>>>>>> >>>>>>>>>>>>>>> with a Windowed state store of Duration N. This would allow >>> the >>>>>>> same >>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This should >>> allow >>>>> for >>>>>>>>>> all >>>>>>>>>>>>>>>>> late-arriving events to be processed, and should be >>>>> customizable >>>>>>>>>> by >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10 >>>>> minutes >>>>>>> of >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> window, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> or perhaps 7 days...). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the trick here. >>>>> Even >>>>>>>>>> if I >>>>>>>>>>>>>>>> would still like to see the automatic repartitioning >>> optional >>>>>>>>>> since I >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> would >>>>>>>>>>>>>> >>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a little bit >>>>>>>>>> sceptical >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> about >>>>>>>>>>>>>> >>>>>>>>>>>>>>> how to determine the window. So esentially one could run >>> into >>>>>>>>>> problems >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> when >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the rapid change happens near a window border. I will check >>> you >>>>>>>>>>>>>>>> implementation in detail, if its problematic, we could >>> still >>>>>>> check >>>>>>>>>>>>>>>> _all_ >>>>>>>>>>>>>>>> windows on read with not to bad performance impact I guess. >>>>> Will >>>>>>>>>> let >>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>> know if the implementation would be correct as is. I >>> wouldn't >>>>> not >>>>>>>>>> like >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A) < >>>>>>> timestamp(B). >>>>>>>>>> I >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> think >>>>>>>>>>>>>> >>>>>>>>>>>>>>> we can't expect that. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Jan >>>>>>>>>>>>>>>>> I believe I understand what you mean now - thanks for the >>>>>>>>>> diagram, it >>>>>>>>>>>>>>>>> did really help. You are correct that I do not have the >>>>> original >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> primary >>>>>>>>>>>>>> >>>>>>>>>>>>>>> key available, and I can see that if it was available then >>> you >>>>>>>>>> would be >>>>>>>>>>>>>>>>> able to add and remove events from the Map. That being >>> said, >>>>> I >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> encourage >>>>>>>>>>>>>> >>>>>>>>>>>>>>> you to finish your diagrams / charts just for clarity for >>>>> everyone >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> else. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But >>> I >>>>>>>>>> understand >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the original primary >>>>> key, >>>>>>> We >>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI and basically >>>>> not >>>>>>>>>> using >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> any >>>>>>>>>>>>>> >>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed that in >>> original >>>>> DSL >>>>>>>>>> its >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> not >>>>>>>>>>>>>> >>>>>>>>>>>>>>> there and just assumed it. total brain mess up on my end. >>> Will >>>>>>>>>> finish >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> chart as soon as i get a quite evening this week. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My follow up question for you is, won't the Map stay inside >>>>> the >>>>>>>>>> State >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Store indefinitely after all of the changes have >>> propagated? >>>>>>> Isn't >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> effectively the same as a highwater mark state store? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna >>>>> return >>>>>>>>>> `null` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the key is removed from the keyspace. But there is going to >>> be >>>>> a >>>>>>>>>> store >>>>>>>>>>>>>>>> 100%, the good thing is that I can use this store directly >>> for >>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a regular >>> store, >>>>>>>>>> satisfying >>>>>>>>>>>>>>>> all gurantees needed for further groupby / join. The >>> Windowed >>>>>>>>>> store is >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> not >>>>>>>>>>>>>> >>>>>>>>>>>>>>> keeping the values, so for the next statefull operation we >>>>> would >>>>>>>>>>>>>>>> need to instantiate an extra store. or we have the window >>>>> store >>>>>>>>>> also >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> have >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the values then. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Long story short. if we can flip in a custom group by >>> before >>>>>>>>>>>>>>>> repartitioning to the original primary key i think it would >>>>> help >>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> users >>>>>>>>>>>>>> >>>>>>>>>>>>>>> big time in building efficient apps. Given the original >>> primary >>>>>>> key >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> issue I >>>>>>>>>>>>>> >>>>>>>>>>>>>>> understand that we do not have a solid foundation to build >>> on. >>>>>>>>>>>>>>>> Leaving primary key carry along to the user. very >>>>> unfortunate. I >>>>>>>>>> could >>>>>>>>>>>>>>>> understand the decision goes like that. I do not think its >>> a >>>>> good >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> decision. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Adam >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre < >>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto: >>>>> dumbreprajakta...@gmail.com >>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> please remove me from this group >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak >>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >>>>>>> jan.filip...@trivago.com >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > Hi Adam, >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > give me some time, will make such a chart. last >>>>> time i >>>>>>>>>> didn't >>>>>>>>>>>>>>>>> get along >>>>>>>>>>>>>>>>> > well with giphy and ruined all your charts. >>>>>>>>>>>>>>>>> > Hopefully i can get it done today >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > On 08.09.2018 16:00, Adam Bellemare wrote: >>>>>>>>>>>>>>>>> > > Hi Jan >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > I have included a diagram of what I attempted >>> on >>>>> the >>>>>>>>>> KIP. >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su >>>>>>>>>>>>>>>>> >>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining >>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate >>>>>>>>>>>>>>>>> < >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S >>>>>>>>>>>>>>>>> >>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin >>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > I attempted this back at the start of my own >>>>>>>>>> implementation >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> > > solution, and since I could not get it to >>> work I >>>>> have >>>>>>>>>> since >>>>>>>>>>>>>>>>> discarded the >>>>>>>>>>>>>>>>> > > code. At this point in time, if you wish to >>>>> continue >>>>>>>>>> pursuing >>>>>>>>>>>>>>>>> for your >>>>>>>>>>>>>>>>> > > groupBy solution, I ask that you please >>> create a >>>>>>>>>> diagram on >>>>>>>>>>>>>>>>> the KIP >>>>>>>>>>>>>>>>> > > carefully explaining your solution. Please >>> feel >>>>> free >>>>>>> to >>>>>>>>>> use >>>>>>>>>>>>>>>>> the image I >>>>>>>>>>>>>>>>> > > just posted as a starting point. I am having >>>>> trouble >>>>>>>>>>>>>>>>> understanding your >>>>>>>>>>>>>>>>> > > explanations but I think that a carefully >>>>> constructed >>>>>>>>>> diagram >>>>>>>>>>>>>>>>> will clear >>>>>>>>>>>>>>>>> > up >>>>>>>>>>>>>>>>> > > any misunderstandings. Alternately, please >>> post a >>>>>>>>>>>>>>>>> comprehensive PR with >>>>>>>>>>>>>>>>> > > your solution. I can only guess at what you >>>>> mean, and >>>>>>>>>> since I >>>>>>>>>>>>>>>>> value my >>>>>>>>>>>>>>>>> > own >>>>>>>>>>>>>>>>> > > time as much as you value yours, I believe it >>> is >>>>> your >>>>>>>>>>>>>>>>> responsibility to >>>>>>>>>>>>>>>>> > > provide an implementation instead of me >>> trying to >>>>>>> guess. >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > Adam >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak >>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >>>>>>> jan.filip...@trivago.com >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > >> Hi James, >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> nice to see you beeing interested. kafka >>>>> streams at >>>>>>>>>> this >>>>>>>>>>>>>>>>> point supports >>>>>>>>>>>>>>>>> > >> all sorts of joins as long as both streams >>> have >>>>> the >>>>>>>>>> same >>>>>>>>>>>>>>>>> key. >>>>>>>>>>>>>>>>> > >> Adam is currently implementing a join where a >>>>> KTable >>>>>>>>>> and a >>>>>>>>>>>>>>>>> KTable can >>>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>>> > >> a one to many relation ship (1:n). We exploit >>>>> that >>>>>>>>>> rocksdb >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > >> datastore that keeps data sorted (At least >>>>> exposes an >>>>>>>>>> API to >>>>>>>>>>>>>>>>> access the >>>>>>>>>>>>>>>>> > >> stored data in a sorted fashion). >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> I think the technical caveats are well >>>>> understood >>>>>>> now >>>>>>>>>> and we >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > basically >>>>>>>>>>>>>>>>> > >> down to philosophy and API Design ( when Adam >>>>> sees >>>>>>> my >>>>>>>>>> newest >>>>>>>>>>>>>>>>> message). >>>>>>>>>>>>>>>>> > >> I have a lengthy track record of loosing >>> those >>>>> kinda >>>>>>>>>>>>>>>>> arguments within >>>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>>> > >> streams community and I have no clue why. So >>> I >>>>>>>>>> literally >>>>>>>>>>>>>>>>> can't wait for >>>>>>>>>>>>>>>>> > you >>>>>>>>>>>>>>>>> > >> to churn through this thread and give you >>>>> opinion on >>>>>>>>>> how we >>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>> > design >>>>>>>>>>>>>>>>> > >> the return type of the oneToManyJoin and how >>>>> many >>>>>>>>>> power we >>>>>>>>>>>>>>>>> want to give >>>>>>>>>>>>>>>>> > to >>>>>>>>>>>>>>>>> > >> the user vs "simplicity" (where simplicity >>> isn't >>>>>>>>>> really that >>>>>>>>>>>>>>>>> as users >>>>>>>>>>>>>>>>> > still >>>>>>>>>>>>>>>>> > >> need to understand it I argue) >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> waiting for you to join in on the discussion >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> Best Jan >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> On 07.09.2018 15:49, James Kwan wrote: >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >>> I am new to this group and I found this >>> subject >>>>>>>>>>>>>>>>> interesting. Sounds >>>>>>>>>>>>>>>>> > like >>>>>>>>>>>>>>>>> > >>> you guys want to implement a join table of >>> two >>>>>>>>>> streams? Is >>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>> > somewhere >>>>>>>>>>>>>>>>> > >>> I can see the original requirement or >>> proposal? >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak >>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >>>>>>> jan.filip...@trivago.com >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>> wrote: >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> On 05.09.2018 22:17, Adam Bellemare wrote: >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>>> I'm currently testing using a Windowed >>> Store >>>>> to >>>>>>>>>> store the >>>>>>>>>>>>>>>>> highwater >>>>>>>>>>>>>>>>> > >>>>> mark. >>>>>>>>>>>>>>>>> > >>>>> By all indications this should work fine, >>>>> with >>>>>>> the >>>>>>>>>> caveat >>>>>>>>>>>>>>>>> being that >>>>>>>>>>>>>>>>> > it >>>>>>>>>>>>>>>>> > >>>>> can >>>>>>>>>>>>>>>>> > >>>>> only resolve out-of-order arrival for up >>> to >>>>> the >>>>>>>>>> size of >>>>>>>>>>>>>>>>> the window >>>>>>>>>>>>>>>>> > (ie: >>>>>>>>>>>>>>>>> > >>>>> 24h, 72h, etc). This would remove the >>>>> possibility >>>>>>>>>> of it >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> being >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > unbounded >>>>>>>>>>>>>>>>> > >>>>> in >>>>>>>>>>>>>>>>> > >>>>> size. >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> With regards to Jan's suggestion, I >>> believe >>>>> this >>>>>>> is >>>>>>>>>> where >>>>>>>>>>>>>>>>> we will >>>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>>> > >>>>> to >>>>>>>>>>>>>>>>> > >>>>> remain in disagreement. While I do not >>>>> disagree >>>>>>>>>> with your >>>>>>>>>>>>>>>>> statement >>>>>>>>>>>>>>>>> > >>>>> about >>>>>>>>>>>>>>>>> > >>>>> there likely to be additional joins done >>> in a >>>>>>>>>> real-world >>>>>>>>>>>>>>>>> workflow, I >>>>>>>>>>>>>>>>> > do >>>>>>>>>>>>>>>>> > >>>>> not >>>>>>>>>>>>>>>>> > >>>>> see how you can conclusively deal with >>>>>>> out-of-order >>>>>>>>>>>>>>>>> arrival >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> > >>>>> foreign-key >>>>>>>>>>>>>>>>> > >>>>> changes and subsequent joins. I have >>>>> attempted >>>>>>> what >>>>>>>>>> I >>>>>>>>>>>>>>>>> think you have >>>>>>>>>>>>>>>>> > >>>>> proposed (without a high-water, using >>>>> groupBy and >>>>>>>>>> reduce) >>>>>>>>>>>>>>>>> and found >>>>>>>>>>>>>>>>> > >>>>> that if >>>>>>>>>>>>>>>>> > >>>>> the foreign key changes too quickly, or >>> the >>>>> load >>>>>>> on >>>>>>>>>> a >>>>>>>>>>>>>>>>> stream thread >>>>>>>>>>>>>>>>> > is >>>>>>>>>>>>>>>>> > >>>>> too >>>>>>>>>>>>>>>>> > >>>>> high, the joined messages will arrive >>>>>>> out-of-order >>>>>>>>>> and be >>>>>>>>>>>>>>>>> incorrectly >>>>>>>>>>>>>>>>> > >>>>> propagated, such that an intermediate >>> event >>>>> is >>>>>>>>>>>>>>>>> represented >>>>>>>>>>>>>>>>> as the >>>>>>>>>>>>>>>>> > final >>>>>>>>>>>>>>>>> > >>>>> event. >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>> Can you shed some light on your groupBy >>>>>>>>>> implementation. >>>>>>>>>>>>>>>>> There must be >>>>>>>>>>>>>>>>> > >>>> some sort of flaw in it. >>>>>>>>>>>>>>>>> > >>>> I have a suspicion where it is, I would >>> just >>>>> like >>>>>>> to >>>>>>>>>>>>>>>>> confirm. The idea >>>>>>>>>>>>>>>>> > >>>> is bullet proof and it must be >>>>>>>>>>>>>>>>> > >>>> an implementation mess up. I would like to >>>>> clarify >>>>>>>>>> before >>>>>>>>>>>>>>>>> we draw a >>>>>>>>>>>>>>>>> > >>>> conclusion. >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> Repartitioning the scattered events >>> back to >>>>>>> their >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> original >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > >>>>> partitions is the only way I know how to >>>>>>> conclusively >>>>>>>>>> deal >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>> > >>>>> out-of-order events in a given time frame, >>>>> and to >>>>>>>>>> ensure >>>>>>>>>>>>>>>>> that the >>>>>>>>>>>>>>>>> > data >>>>>>>>>>>>>>>>> > >>>>> is >>>>>>>>>>>>>>>>> > >>>>> eventually consistent with the input >>> events. >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> If you have some code to share that >>>>> illustrates >>>>>>> your >>>>>>>>>>>>>>>>> approach, I >>>>>>>>>>>>>>>>> > would >>>>>>>>>>>>>>>>> > >>>>> be >>>>>>>>>>>>>>>>> > >>>>> very grateful as it would remove any >>>>>>>>>> misunderstandings >>>>>>>>>>>>>>>>> that I may >>>>>>>>>>>>>>>>> > have. >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>> ah okay you were looking for my code. I >>> don't >>>>> have >>>>>>>>>>>>>>>>> something easily >>>>>>>>>>>>>>>>> > >>>> readable here as its bloated with >>> OO-patterns. >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> its anyhow trivial: >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> @Override >>>>>>>>>>>>>>>>> > >>>> public T apply(K aggKey, V value, T >>>>>>> aggregate) >>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>> > >>>> Map<U, V> currentStateAsMap = >>>>>>>>>> asMap(aggregate); >>>>>>>>>>>>>>>>> << >>>>>>>>>>>>>>>>> imaginary >>>>>>>>>>>>>>>>> > >>>> U toModifyKey = >>> mapper.apply(value); >>>>>>>>>>>>>>>>> > >>>> << this is the place where >>> people >>>>>>>>>> actually >>>>>>>>>>>>>>>>> gonna have >>>>>>>>>>>>>>>>> > issues >>>>>>>>>>>>>>>>> > >>>> and why you probably couldn't do it. we >>> would >>>>> need >>>>>>>>>> to find >>>>>>>>>>>>>>>>> a solution >>>>>>>>>>>>>>>>> > here. >>>>>>>>>>>>>>>>> > >>>> I didn't realize that yet. >>>>>>>>>>>>>>>>> > >>>> << we propagate the field in >>> the >>>>>>>>>> joiner, so >>>>>>>>>>>>>>>>> that we can >>>>>>>>>>>>>>>>> > pick >>>>>>>>>>>>>>>>> > >>>> it up in an aggregate. Probably you have >>> not >>>>>>> thought >>>>>>>>>> of >>>>>>>>>>>>>>>>> this in your >>>>>>>>>>>>>>>>> > >>>> approach right? >>>>>>>>>>>>>>>>> > >>>> << I am very open to find a >>>>> generic >>>>>>>>>> solution >>>>>>>>>>>>>>>>> here. In my >>>>>>>>>>>>>>>>> > >>>> honest opinion this is broken in >>>>>>> KTableImpl.GroupBy >>>>>>>>>> that >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> looses >>>>>>>>>>>>>>>>> > the keys >>>>>>>>>>>>>>>>> > >>>> and only maintains the aggregate key. >>>>>>>>>>>>>>>>> > >>>> << I abstracted it away back >>>>> then way >>>>>>>>>> before >>>>>>>>>>>>>>>>> i >>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>> > thinking >>>>>>>>>>>>>>>>> > >>>> of oneToMany join. That is why I didn't >>>>> realize >>>>>>> its >>>>>>>>>>>>>>>>> significance here. >>>>>>>>>>>>>>>>> > >>>> << Opinions? >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> for (V m : current) >>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>> > >>>> currentStateAsMap.put(mapper.apply(m), m); >>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>> > >>>> if (isAdder) >>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>> > >>>> currentStateAsMap.put(toModifyKey, value); >>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>> > >>>> else >>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>> > >>>> currentStateAsMap.remove(toModifyKey); >>>>>>>>>>>>>>>>> > >>>> if(currentStateAsMap.isEmpty()){ >>>>>>>>>>>>>>>>> > >>>> return null; >>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>> > >>>> retrun >>>>> asAggregateType(currentStateAsMap) >>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>> > >>>> Thanks, >>>>>>>>>>>>>>>>> > >>>>> Adam >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan >>> Filipiak >>>>> < >>>>>>>>>>>>>>>>> > jan.filip...@trivago.com <mailto: >>>>>>> jan.filip...@trivago.com >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>> wrote: >>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>> > >>>>> Thanks Adam for bringing Matthias to >>> speed! >>>>>>>>>>>>>>>>> > >>>>>> about the differences. I think re-keying >>>>> back >>>>>>>>>> should be >>>>>>>>>>>>>>>>> optional at >>>>>>>>>>>>>>>>> > >>>>>> best. >>>>>>>>>>>>>>>>> > >>>>>> I would say we return a KScatteredTable >>> with >>>>>>>>>> reshuffle() >>>>>>>>>>>>>>>>> returning >>>>>>>>>>>>>>>>> > >>>>>> KTable<originalKey,Joined> to make the >>>>> backwards >>>>>>>>>>>>>>>>> repartitioning >>>>>>>>>>>>>>>>> > >>>>>> optional. >>>>>>>>>>>>>>>>> > >>>>>> I am also in a big favour of doing the >>> out >>>>> of >>>>>>> order >>>>>>>>>>>>>>>>> processing using >>>>>>>>>>>>>>>>> > >>>>>> group >>>>>>>>>>>>>>>>> > >>>>>> by instead high water mark tracking. >>>>>>>>>>>>>>>>> > >>>>>> Just because unbounded growth is just >>> scary >>>>> + It >>>>>>>>>> saves >>>>>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>> the header >>>>>>>>>>>>>>>>> > >>>>>> stuff. >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> I think the abstraction of always >>>>> repartitioning >>>>>>>>>> back is >>>>>>>>>>>>>>>>> just not so >>>>>>>>>>>>>>>>> > >>>>>> strong. Like the work has been done >>> before >>>>> we >>>>>>>>>> partition >>>>>>>>>>>>>>>>> back and >>>>>>>>>>>>>>>>> > >>>>>> grouping >>>>>>>>>>>>>>>>> > >>>>>> by something else afterwards is really >>>>> common. >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> On 05.09.2018 13:49, Adam Bellemare >>> wrote: >>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>> > >>>>>> Hi Matthias >>>>>>>>>>>>>>>>> > >>>>>>> Thank you for your feedback, I do >>>>> appreciate >>>>>>> it! >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> While name spacing would be possible, it >>>>> would >>>>>>>>>> require >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> > deserialize >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> user headers what implies a runtime >>>>> overhead. >>>>>>> I >>>>>>>>>> would >>>>>>>>>>>>>>>>> suggest to >>>>>>>>>>>>>>>>> > no >>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to avoid the >>> overhead. >>>>> If >>>>>>> this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> becomes a >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > problem in >>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can still add name >>> spacing >>>>>>> later >>>>>>>>>> on. >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Agreed. I will go with using a reserved >>>>> string >>>>>>>>>> and >>>>>>>>>>>>>>>>> document it. >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> My main concern about the design it the >>>>> type of >>>>>>>>>> the >>>>>>>>>>>>>>>>> result KTable: >>>>>>>>>>>>>>>>> > If >>>>>>>>>>>>>>>>> > >>>>>>> I >>>>>>>>>>>>>>>>> > >>>>>>> understood the proposal correctly, >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> In your example, you have table1 and >>> table2 >>>>>>>>>> swapped. >>>>>>>>>>>>>>>>> Here is how it >>>>>>>>>>>>>>>>> > >>>>>>> works >>>>>>>>>>>>>>>>> > >>>>>>> currently: >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> 1) table1 has the records that contain >>> the >>>>>>>>>> foreign key >>>>>>>>>>>>>>>>> within their >>>>>>>>>>>>>>>>> > >>>>>>> value. >>>>>>>>>>>>>>>>> > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, >>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>, >>>>>>>>>>>>>>>>> > >>>>>>> <c,(fk=B,bar=3)> >>>>>>>>>>>>>>>>> > >>>>>>> table2 input stream: <A,X>, <B,Y> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> 2) A Value mapper is required to extract >>>>> the >>>>>>>>>> foreign >>>>>>>>>>>>>>>>> key. >>>>>>>>>>>>>>>>> > >>>>>>> table1 foreign key mapper: ( value => >>>>> value.fk >>>>>>>>>>>>>>>>> <http://value.fk> ) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> The mapper is applied to each element in >>>>>>> table1, >>>>>>>>>> and a >>>>>>>>>>>>>>>>> new combined >>>>>>>>>>>>>>>>> > >>>>>>> key is >>>>>>>>>>>>>>>>> > >>>>>>> made: >>>>>>>>>>>>>>>>> > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, >>> <A-b, >>>>>>>>>>>>>>>>> (fk=A,bar=2)>, >>>>>>>>>>>>>>>>> <B-c, >>>>>>>>>>>>>>>>> > >>>>>>> (fk=B,bar=3)> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> 3) The rekeyed events are copartitioned >>>>> with >>>>>>>>>> table2: >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> a) Stream Thread with Partition 0: >>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <A-a, >>> (fk=A,bar=1)>, >>>>> <A-b, >>>>>>>>>>>>>>>>> (fk=A,bar=2)> >>>>>>>>>>>>>>>>> > >>>>>>> Table2: <A,X> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> b) Stream Thread with Partition 1: >>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> >>>>>>>>>>>>>>>>> > >>>>>>> Table2: <B,Y> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> 4) From here, they can be joined >>> together >>>>>>> locally >>>>>>>>>> by >>>>>>>>>>>>>>>>> applying the >>>>>>>>>>>>>>>>> > >>>>>>> joiner >>>>>>>>>>>>>>>>> > >>>>>>> function. >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> At this point, Jan's design and my >>> design >>>>>>>>>> deviate. My >>>>>>>>>>>>>>>>> design goes >>>>>>>>>>>>>>>>> > on >>>>>>>>>>>>>>>>> > >>>>>>> to >>>>>>>>>>>>>>>>> > >>>>>>> repartition the data post-join and >>> resolve >>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>> arrival of >>>>>>>>>>>>>>>>> > >>>>>>> records, >>>>>>>>>>>>>>>>> > >>>>>>> finally returning the data keyed just >>> the >>>>>>>>>> original key. >>>>>>>>>>>>>>>>> I do not >>>>>>>>>>>>>>>>> > >>>>>>> expose >>>>>>>>>>>>>>>>> > >>>>>>> the >>>>>>>>>>>>>>>>> > >>>>>>> CombinedKey or any of the internals >>>>> outside of >>>>>>> the >>>>>>>>>>>>>>>>> joinOnForeignKey >>>>>>>>>>>>>>>>> > >>>>>>> function. This does make for larger >>>>> footprint, >>>>>>>>>> but it >>>>>>>>>>>>>>>>> removes all >>>>>>>>>>>>>>>>> > >>>>>>> agency >>>>>>>>>>>>>>>>> > >>>>>>> for resolving out-of-order arrivals and >>>>>>> handling >>>>>>>>>>>>>>>>> CombinedKeys from >>>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>>> > >>>>>>> user. I believe that this makes the >>>>> function >>>>>>> much >>>>>>>>>>>>>>>>> easier >>>>>>>>>>>>>>>>> to use. >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> Let me know if this helps resolve your >>>>>>> questions, >>>>>>>>>> and >>>>>>>>>>>>>>>>> please feel >>>>>>>>>>>>>>>>> > >>>>>>> free to >>>>>>>>>>>>>>>>> > >>>>>>> add anything else on your mind. >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> Thanks again, >>>>>>>>>>>>>>>>> > >>>>>>> Adam >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, >>> Matthias J. >>>>>>> Sax < >>>>>>>>>>>>>>>>> > >>>>>>> matth...@confluent.io <mailto: >>>>>>>>>> matth...@confluent.io>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> wrote: >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>> Hi, >>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> I am just catching up on this thread. I >>>>> did >>>>>>> not >>>>>>>>>> read >>>>>>>>>>>>>>>>> everything so >>>>>>>>>>>>>>>>> > >>>>>>>> far, >>>>>>>>>>>>>>>>> > >>>>>>>> but want to share couple of initial >>>>> thoughts: >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Headers: I think there is a fundamental >>>>>>>>>> difference >>>>>>>>>>>>>>>>> between header >>>>>>>>>>>>>>>>> > >>>>>>>> usage >>>>>>>>>>>>>>>>> > >>>>>>>> in this KIP and KP-258. For 258, we add >>>>>>> headers >>>>>>>>>> to >>>>>>>>>>>>>>>>> changelog topic >>>>>>>>>>>>>>>>> > >>>>>>>> that >>>>>>>>>>>>>>>>> > >>>>>>>> are owned by Kafka Streams and nobody >>>>> else is >>>>>>>>>> supposed >>>>>>>>>>>>>>>>> to write >>>>>>>>>>>>>>>>> > into >>>>>>>>>>>>>>>>> > >>>>>>>> them. In fact, no user header are >>> written >>>>> into >>>>>>>>>> the >>>>>>>>>>>>>>>>> changelog topic >>>>>>>>>>>>>>>>> > >>>>>>>> and >>>>>>>>>>>>>>>>> > >>>>>>>> thus, there are not conflicts. >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Nevertheless, I don't see a big issue >>> with >>>>>>> using >>>>>>>>>>>>>>>>> headers within >>>>>>>>>>>>>>>>> > >>>>>>>> Streams. >>>>>>>>>>>>>>>>> > >>>>>>>> As long as we document it, we can have >>>>> some >>>>>>>>>> "reserved" >>>>>>>>>>>>>>>>> header keys >>>>>>>>>>>>>>>>> > >>>>>>>> and >>>>>>>>>>>>>>>>> > >>>>>>>> users are not allowed to use when >>>>> processing >>>>>>>>>> data with >>>>>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>> > Streams. >>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this should be ok. >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> I think there is a safe way to avoid >>>>>>> conflicts, >>>>>>>>>> since >>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>> > headers >>>>>>>>>>>>>>>>> > >>>>>>>> are >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>> only needed in internal topics (I >>> think): >>>>>>>>>>>>>>>>> > >>>>>>>>> For internal and changelog topics, we >>> can >>>>>>>>>> namespace >>>>>>>>>>>>>>>>> all headers: >>>>>>>>>>>>>>>>> > >>>>>>>>> * user-defined headers are namespaced >>> as >>>>>>>>>> "external." >>>>>>>>>>>>>>>>> + >>>>>>>>>>>>>>>>> headerKey >>>>>>>>>>>>>>>>> > >>>>>>>>> * internal headers are namespaced as >>>>>>>>>> "internal." + >>>>>>>>>>>>>>>>> headerKey >>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>> While name spacing would be possible, >>> it >>>>>>> would >>>>>>>>>>>>>>>>> require >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>> deserialize >>>>>>>>>>>>>>>>> > >>>>>>>> user headers what implies a runtime >>>>> overhead. >>>>>>> I >>>>>>>>>> would >>>>>>>>>>>>>>>>> suggest to >>>>>>>>>>>>>>>>> > no >>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to avoid the >>> overhead. >>>>> If >>>>>>> this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> becomes a >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > problem in >>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can still add name >>> spacing >>>>>>> later >>>>>>>>>> on. >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> My main concern about the design it the >>>>> type >>>>>>> of >>>>>>>>>> the >>>>>>>>>>>>>>>>> result KTable: >>>>>>>>>>>>>>>>> > >>>>>>>> If I >>>>>>>>>>>>>>>>> > >>>>>>>> understood the proposal correctly, >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V1> table1 = ... >>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K2,V2> table2 = ... >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V3> joinedTable = >>>>>>>>>> table1.join(table2,...); >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> implies that the `joinedTable` has the >>>>> same >>>>>>> key >>>>>>>>>> as the >>>>>>>>>>>>>>>>> left input >>>>>>>>>>>>>>>>> > >>>>>>>> table. >>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this does not work because if >>> table2 >>>>>>>>>> contains >>>>>>>>>>>>>>>>> multiple rows >>>>>>>>>>>>>>>>> > >>>>>>>> that >>>>>>>>>>>>>>>>> > >>>>>>>> join with a record in table1 (what is >>> the >>>>> main >>>>>>>>>> purpose >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> > foreign >>>>>>>>>>>>>>>>> > >>>>>>>> key >>>>>>>>>>>>>>>>> > >>>>>>>> join), the result table would only >>>>> contain a >>>>>>>>>> single >>>>>>>>>>>>>>>>> join result, >>>>>>>>>>>>>>>>> > but >>>>>>>>>>>>>>>>> > >>>>>>>> not >>>>>>>>>>>>>>>>> > >>>>>>>> multiple. >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Example: >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> table1 input stream: <A,X> >>>>>>>>>>>>>>>>> > >>>>>>>> table2 input stream: <a,(A,1)>, >>> <b,(A,2)> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> We use table2 value a foreign key to >>>>> table1 >>>>>>> key >>>>>>>>>> (ie, >>>>>>>>>>>>>>>>> "A" joins). >>>>>>>>>>>>>>>>> > If >>>>>>>>>>>>>>>>> > >>>>>>>> the >>>>>>>>>>>>>>>>> > >>>>>>>> result key is the same key as key of >>>>> table1, >>>>>>> this >>>>>>>>>>>>>>>>> implies that the >>>>>>>>>>>>>>>>> > >>>>>>>> result can either be <A, join(X,1)> or >>> <A, >>>>>>>>>> join(X,2)> >>>>>>>>>>>>>>>>> but not >>>>>>>>>>>>>>>>> > both. >>>>>>>>>>>>>>>>> > >>>>>>>> Because the share the same key, >>> whatever >>>>>>> result >>>>>>>>>> record >>>>>>>>>>>>>>>>> we emit >>>>>>>>>>>>>>>>> > later, >>>>>>>>>>>>>>>>> > >>>>>>>> overwrite the previous result. >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> This is the reason why Jan originally >>>>> proposed >>>>>>>>>> to use >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> > combination >>>>>>>>>>>>>>>>> > >>>>>>>> of >>>>>>>>>>>>>>>>> > >>>>>>>> both primary keys of the input tables >>> as >>>>> key >>>>>>> of >>>>>>>>>> the >>>>>>>>>>>>>>>>> output table. >>>>>>>>>>>>>>>>> > >>>>>>>> This >>>>>>>>>>>>>>>>> > >>>>>>>> makes the keys of the output table >>> unique >>>>> and >>>>>>> we >>>>>>>>>> can >>>>>>>>>>>>>>>>> store both in >>>>>>>>>>>>>>>>> > >>>>>>>> the >>>>>>>>>>>>>>>>> > >>>>>>>> output table: >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, >>>>>>>>>> join(X,2)> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Thoughts? >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> -Matthias >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: >>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> Just on remark here. >>>>>>>>>>>>>>>>> > >>>>>>>>> The high-watermark could be >>> disregarded. >>>>> The >>>>>>>>>> decision >>>>>>>>>>>>>>>>> about the >>>>>>>>>>>>>>>>> > >>>>>>>>> forward >>>>>>>>>>>>>>>>> > >>>>>>>>> depends on the size of the aggregated >>>>> map. >>>>>>>>>>>>>>>>> > >>>>>>>>> Only 1 element long maps would be >>>>> unpacked >>>>>>> and >>>>>>>>>>>>>>>>> forwarded. 0 >>>>>>>>>>>>>>>>> > element >>>>>>>>>>>>>>>>> > >>>>>>>>> maps >>>>>>>>>>>>>>>>> > >>>>>>>>> would be published as delete. Any >>> other >>>>> count >>>>>>>>>>>>>>>>> > >>>>>>>>> of map entries is in "waiting for >>> correct >>>>>>>>>> deletes to >>>>>>>>>>>>>>>>> > arrive"-state. >>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare >>>>> wrote: >>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>> It does look like I could replace the >>>>> second >>>>>>>>>>>>>>>>> repartition store >>>>>>>>>>>>>>>>> > and >>>>>>>>>>>>>>>>> > >>>>>>>>>> highwater store with a groupBy and >>>>> reduce. >>>>>>>>>> However, >>>>>>>>>>>>>>>>> it looks >>>>>>>>>>>>>>>>> > like >>>>>>>>>>>>>>>>> > >>>>>>>>>> I >>>>>>>>>>>>>>>>> > >>>>>>>>>> would >>>>>>>>>>>>>>>>> > >>>>>>>>>> still need to store the highwater >>> value >>>>>>> within >>>>>>>>>> the >>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>> > >>>>>>>>>> store, >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> to >>>>>>>>>>>>>>>>> > >>>>>>>>> compare the arrival of out-of-order >>>>> records >>>>>>>>>> (assuming >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> my >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>> understanding >>>>>>>>>>>>>>>>> > >>>>>>>>> of >>>>>>>>>>>>>>>>> > >>>>>>>>> THIS is correct...). This in effect is >>>>> the >>>>>>> same >>>>>>>>>> as >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> design I >>>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>>> > >>>>>>>>> now, >>>>>>>>>>>>>>>>> > >>>>>>>>> just with the two tables merged >>> together. >>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >