Hi Jan and Adam, Wow, thanks for doing that test, Adam. Those results are encouraging.
Thanks for your performance experience as well, Jan. I agree that avoiding unnecessary join outputs is especially important when the fan-out is so high. I suppose this could also be built into the implementation we're discussing, but it wouldn't have to be specified in the KIP (since it's an API-transparent optimization). As far as whether or not to re-repartition the data, I didn't bring it up because it sounded like the two of you agreed to leave the KIP as-is, despite the disagreement. If you want my opinion, I feel like both approaches are reasonable. It sounds like Jan values more the potential for developers to optimize their topologies to re-use the intermediate nodes, whereas Adam places more value on having a single operator that people can use without extra steps at the end. Personally, although I do find it exceptionally annoying when a framework gets in my way when I'm trying to optimize something, it seems better to go for a single operation. * Encapsulating the internal transitions gives us significant latitude in the implementation (for example, joining only at the end, not in the middle to avoid extra data copying and out-of-order resolution; how we represent the first repartition keys (combined keys vs. value vectors), etc.). If we publish something like a KScatteredTable with the right-partitioned joined data, then the API pretty much locks in the implementation as well. * The API seems simpler to understand and use. I do mean "seems"; if anyone wants to make the case that KScatteredTable is actually simpler, I think hypothetical usage code would help. From a relational algebra perspective, it seems like KTable.join(KTable) should produce a new KTable in all cases. * That said, there might still be room in the API for a different operation like what Jan has proposed to scatter a KTable, and then do things like join, re-group, etc from there... I'm not sure; I haven't thought through all the consequences yet. This is all just my opinion after thinking over the discussion so far... -John On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Updated the PR to take into account John's feedback. > > I did some preliminary testing for the performance of the prefixScan. I > have attached the file, but I will also include the text in the body here > for archival purposes (I am not sure what happens to attached files). I > also updated the PR and the KIP accordingly. > > Summary: It scales exceptionally well for scanning large values of > records. As Jan mentioned previously, the real issue would be more around > processing the resulting records after obtaining them. For instance, it > takes approximately ~80-120 mS to flush the buffer and a further ~35-85mS > to scan 27.5M records, obtaining matches for 2.5M of them. Iterating > through the records just to generate a simple count takes ~ 40 times longer > than the flush + scan combined. > > ============================================================================================ > Setup: > > ============================================================================================ > Java 9 with default settings aside from a 512 MB heap (Xmx512m, Xms512m) > CPU: i7 2.2 Ghz. > > Note: I am using a slightly-modified, directly-accessible Kafka Streams > RocksDB > implementation (RocksDB.java, basically just avoiding the > ProcessorContext). > There are no modifications to the default RocksDB values provided in the > 2.1/trunk release. > > > keysize = 128 bytes > valsize = 512 bytes > > Step 1: > Write X positive matching events: (key = prefix + left-padded > auto-incrementing integer) > Step 2: > Write 10X negative matching events (key = left-padded auto-incrementing > integer) > Step 3: > Perform flush > Step 4: > Perform prefixScan > Step 5: > Iterate through return Iterator and validate the count of expected events. > > > ============================================================================================ > Results: > > ============================================================================================ > X = 1k (11k events total) > Flush Time = 39 mS > Scan Time = 7 mS > 6.9 MB disk > > -------------------------------------------------------------------------------------------- > X = 10k (110k events total) > Flush Time = 45 mS > Scan Time = 8 mS > 127 MB > > -------------------------------------------------------------------------------------------- > X = 100k (1.1M events total) > Test1: > Flush Time = 60 mS > Scan Time = 12 mS > 678 MB > > Test2: > Flush Time = 45 mS > Scan Time = 7 mS > 576 MB > > -------------------------------------------------------------------------------------------- > X = 1MB (11M events total) > Test1: > Flush Time = 52 mS > Scan Time = 19 mS > 7.2 GB > > Test2: > Flush Time = 84 mS > Scan Time = 34 mS > 9.1 GB > > -------------------------------------------------------------------------------------------- > X = 2.5M (27.5M events total) > Test1: > Flush Time = 82 mS > Scan Time = 63 mS > 17GB - 276 sst files > > Test2: > Flush Time = 116 mS > Scan Time = 35 mS > 23GB - 361 sst files > > Test3: > Flush Time = 103 mS > Scan Time = 82 mS > 19 GB - 300 sst files > > -------------------------------------------------------------------------------------------- > > I had to limit my testing on my laptop to X = 2.5M events. I tried to go > to X = 10M (110M events) but RocksDB was going into the 100GB+ range and my > laptop ran out of disk. More extensive testing could be done but I suspect > that it would be in line with what we're seeing in the results above. > > > > > > > At this point in time, I think the only major discussion point is really > around what Jan and I have disagreed on: repartitioning back + resolving > potential out of order issues or leaving that up to the client to handle. > > > Thanks folks, > > Adam > > > On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> >> >> On 29.11.2018 15:14, John Roesler wrote: >> > Hi all, >> > >> > Sorry that this discussion petered out... I think the 2.1 release >> caused an >> > extended distraction that pushed it off everyone's radar (which was >> > precisely Adam's concern). Personally, I've also had some extend >> > distractions of my own that kept (and continue to keep) me preoccupied. >> > >> > However, calling for a vote did wake me up, so I guess Jan was on the >> right >> > track! >> > >> > I've gone back and reviewed the whole KIP document and the prior >> > discussion, and I'd like to offer a few thoughts: >> > >> > API Thoughts: >> > >> > 1. If I read the KIP right, you are proposing a many-to-one join. Could >> we >> > consider naming it manyToOneJoin? Or, if you prefer, flip the design >> around >> > and make it a oneToManyJoin? >> > >> > The proposed name "joinOnForeignKey" disguises the join type, and it >> seems >> > like it might trick some people into using it for a one-to-one join. >> This >> > would work, of course, but it would be super inefficient compared to a >> > simple rekey-and-join. >> > >> > 2. I might have missed it, but I don't think it's specified whether >> it's an >> > inner, outer, or left join. I'm guessing an outer join, as (neglecting >> IQ), >> > the rest can be achieved by filtering or by handling it in the >> ValueJoiner. >> > >> > 3. The arg list to joinOnForeignKey doesn't look quite right. >> > 3a. Regarding Serialized: There are a few different paradigms in play in >> > the Streams API, so it's confusing, but instead of three Serialized >> args, I >> > think it would be better to have one that allows (optionally) setting >> the 4 >> > incoming serdes. The result serde is defined by the Materialized. The >> > incoming serdes can be optional because they might already be available >> on >> > the source KTables, or the default serdes from the config might be >> > applicable. >> > >> > 3b. Is the StreamPartitioner necessary? The other joins don't allow >> setting >> > one, and it seems like it might actually be harmful, since the rekey >> > operation needs to produce results that are co-partitioned with the >> "other" >> > KTable. >> > >> > 4. I'm fine with the "reserved word" header, but I didn't actually >> follow >> > what Matthias meant about namespacing requiring "deserializing" the >> record >> > header. The headers are already Strings, so I don't think that >> > deserialization is required. If we applied the namespace at source nodes >> > and stripped it at sink nodes, this would be practically no overhead. >> The >> > advantage of the namespace idea is that no public API change wrt headers >> > needs to happen, and no restrictions need to be placed on users' >> headers. >> > >> > (Although I'm wondering if we can get away without the header at all... >> > stay tuned) >> > >> > 5. I also didn't follow the discussion about the HWM table growing >> without >> > bound. As I read it, the HWM table is effectively implementing OCC to >> > resolve the problem you noted with disordering when the rekey is >> > reversed... particularly notable when the FK changes. As such, it only >> > needs to track the most recent "version" (the offset in the source >> > partition) of each key. Therefore, it should have the same number of >> keys >> > as the source table at all times. >> > >> > I see that you are aware of KIP-258, which I think might be relevant in >> a >> > couple of ways. One: it's just about storing the timestamp in the state >> > store, but the ultimate idea is to effectively use the timestamp as an >> OCC >> > "version" to drop disordered updates. You wouldn't want to use the >> > timestamp for this operation, but if you were to use a similar >> mechanism to >> > store the source offset in the store alongside the re-keyed values, then >> > you could avoid a separate table. >> > >> > 6. You and Jan have been thinking about this for a long time, so I've >> > probably missed something here, but I'm wondering if we can avoid the >> HWM >> > tracking at all and resolve out-of-order during a final join instead... >> > >> > Let's say we're joining a left table (Integer K: Letter FK, (other >> data)) >> > to a right table (Letter K: (some data)). >> > >> > Left table: >> > 1: (A, xyz) >> > 2: (B, asd) >> > >> > Right table: >> > A: EntityA >> > B: EntityB >> > >> > We could do a rekey as you proposed with a combined key, but not >> > propagating the value at all.. >> > Rekey table: >> > A-1: (dummy value) >> > B-2: (dummy value) >> > >> > Which we then join with the right table to produce: >> > A-1: EntityA >> > B-2: EntityB >> > >> > Which gets rekeyed back: >> > 1: A, EntityA >> > 2: B, EntityB >> > >> > And finally we do the actual join: >> > Result table: >> > 1: ((A, xyz), EntityA) >> > 2: ((B, asd), EntityB) >> > >> > The thing is that in that last join, we have the opportunity to compare >> the >> > current FK in the left table with the incoming PK of the right table. If >> > they don't match, we just drop the event, since it must be outdated. >> > >> >> > In your KIP, you gave an example in which (1: A, xyz) gets updated to >> (1: >> > B, xyz), ultimately yielding a conundrum about whether the final state >> > should be (1: null) or (1: joined-on-B). With the algorithm above, you >> > would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B, >> > EntityB)). It seems like this does give you enough information to make >> the >> > right choice, regardless of disordering. >> >> Will check Adams patch, but this should work. As mentioned often I am >> not convinced on partitioning back for the user automatically. I think >> this is the real performance eater ;) >> >> > >> > >> > 7. Last thought... I'm a little concerned about the performance of the >> > range scans when records change in the right table. You've said that >> you've >> > been using the algorithm you presented in production for a while. Can >> you >> > give us a sense of the performance characteristics you've observed? >> > >> >> Make it work, make it fast, make it beautiful. The topmost thing here is >> / was correctness. In practice I do not measure the performance of the >> range scan. Usual cases I run this with is emitting 500k - 1kk rows >> on a left hand side change. The range scan is just the work you gotta >> do, also when you pack your data into different formats, usually the >> rocks performance is very tight to the size of the data and we can't >> really change that. It is more important for users to prevent useless >> updates to begin with. My left hand side is guarded to drop changes that >> are not going to change my join output. >> >> usually it's: >> >> drop unused fields and then don't forward if old.equals(new) >> >> regarding to the performance of creating an iterator for smaller >> fanouts, users can still just do a group by first then anyways. >> >> >> >> > I could only think of one alternative, but I'm not sure if it's better >> or >> > worse... If the first re-key only needs to preserve the original key, >> as I >> > proposed in #6, then we could store a vector of keys in the value: >> > >> > Left table: >> > 1: A,... >> > 2: B,... >> > 3: A,... >> > >> > Gets re-keyed: >> > A: [1, 3] >> > B: [2] >> > >> > Then, the rhs part of the join would only need a regular single-key >> lookup. >> > Of course we have to deal with the problem of large values, as there's >> no >> > bound on the number of lhs records that can reference rhs records. >> Offhand, >> > I'd say we could page the values, so when one row is past the >> threshold, we >> > append the key for the next page. Then in most cases, it would be a >> single >> > key lookup, but for large fan-out updates, it would be one per (max >> value >> > size)/(avg lhs key size). >> > >> > This seems more complex, though... Plus, I think there's some extra >> > tracking we'd need to do to know when to emit a retraction. For example, >> > when record 1 is deleted, the re-key table would just have (A: [3]). >> Some >> > kind of tombstone is needed so that the join result for 1 can also be >> > retracted. >> > >> > That's all! >> > >> > Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry the >> > discussion has been slow. >> > -John >> > >> > >> > On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <jan.filip...@trivago.com> >> > wrote: >> > >> >> Id say you can just call the vote. >> >> >> >> that happens all the time, and if something comes up, it just goes back >> >> to discuss. >> >> >> >> would not expect to much attention with another another email in this >> >> thread. >> >> >> >> best Jan >> >> >> >> On 09.10.2018 13:56, Adam Bellemare wrote: >> >>> Hello Contributors >> >>> >> >>> I know that 2.1 is about to be released, but I do need to bump this to >> >> keep >> >>> visibility up. I am still intending to push this through once >> contributor >> >>> feedback is given. >> >>> >> >>> Main points that need addressing: >> >>> 1) Any way (or benefit) in structuring the current singular graph node >> >> into >> >>> multiple nodes? It has a whopping 25 parameters right now. I am a bit >> >> fuzzy >> >>> on how the optimizations are supposed to work, so I would appreciate >> any >> >>> help on this aspect. >> >>> >> >>> 2) Overall strategy for joining + resolving. This thread has much >> >> discourse >> >>> between Jan and I between the current highwater mark proposal and a >> >> groupBy >> >>> + reduce proposal. I am of the opinion that we need to strictly handle >> >> any >> >>> chance of out-of-order data and leave none of it up to the consumer. >> Any >> >>> comments or suggestions here would also help. >> >>> >> >>> 3) Anything else that you see that would prevent this from moving to a >> >> vote? >> >>> >> >>> Thanks >> >>> >> >>> Adam >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare < >> >> adam.bellem...@gmail.com> >> >>> wrote: >> >>> >> >>>> Hi Jan >> >>>> >> >>>> With the Stores.windowStoreBuilder and Stores.persistentWindowStore, >> you >> >>>> actually only need to specify the amount of segments you want and how >> >> large >> >>>> they are. To the best of my understanding, what happens is that the >> >>>> segments are automatically rolled over as new data with new >> timestamps >> >> are >> >>>> created. We use this exact functionality in some of the work done >> >>>> internally at my company. For reference, this is the hopping windowed >> >> store. >> >>>> >> >>>> >> >> >> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21 >> >>>> >> >>>> In the code that I have provided, there are going to be two 24h >> >> segments. >> >>>> When a record is put into the windowStore, it will be inserted at >> time >> >> T in >> >>>> both segments. The two segments will always overlap by 12h. As time >> >> goes on >> >>>> and new records are added (say at time T+12h+), the oldest segment >> will >> >> be >> >>>> automatically deleted and a new segment created. The records are by >> >> default >> >>>> inserted with the context.timestamp(), such that it is the record >> time, >> >> not >> >>>> the clock time, which is used. >> >>>> >> >>>> To the best of my understanding, the timestamps are retained when >> >>>> restoring from the changelog. >> >>>> >> >>>> Basically, this is heavy-handed way to deal with TTL at a >> segment-level, >> >>>> instead of at an individual record level. >> >>>> >> >>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < >> jan.filip...@trivago.com> >> >>>> wrote: >> >>>> >> >>>>> Will that work? I expected it to blow up with ClassCastException or >> >>>>> similar. >> >>>>> >> >>>>> You either would have to specify the window you fetch/put or iterate >> >>>>> across all windows the key was found in right? >> >>>>> >> >>>>> I just hope the window-store doesn't check stream-time under the >> hoods >> >>>>> that would be a questionable interface. >> >>>>> >> >>>>> If it does: did you see my comment on checking all the windows >> earlier? >> >>>>> that would be needed to actually give reasonable time gurantees. >> >>>>> >> >>>>> Best >> >>>>> >> >>>>> >> >>>>> >> >>>>> On 25.09.2018 13:18, Adam Bellemare wrote: >> >>>>>> Hi Jan >> >>>>>> >> >>>>>> Check for " highwaterMat " in the PR. I only changed the state >> store, >> >>>>> not >> >>>>>> the ProcessorSupplier. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Adam >> >>>>>> >> >>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak < >> >> jan.filip...@trivago.com >> >>>>>> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: >> >>>>>>> >> >>>>>>>> @Guozhang >> >>>>>>>> >> >>>>>>>> Thanks for the information. This is indeed something that will be >> >>>>>>>> extremely >> >>>>>>>> useful for this KIP. >> >>>>>>>> >> >>>>>>>> @Jan >> >>>>>>>> Thanks for your explanations. That being said, I will not be >> moving >> >>>>> ahead >> >>>>>>>> with an implementation using reshuffle/groupBy solution as you >> >>>>> propose. >> >>>>>>>> That being said, if you wish to implement it yourself off of my >> >>>>> current PR >> >>>>>>>> and submit it as a competitive alternative, I would be more than >> >>>>> happy to >> >>>>>>>> help vet that as an alternate solution. As it stands right now, >> I do >> >>>>> not >> >>>>>>>> really have more time to invest into alternatives without there >> >> being >> >>>>> a >> >>>>>>>> strong indication from the binding voters which they would >> prefer. >> >>>>>>>> >> >>>>>>>> >> >>>>>>> Hey, total no worries. I think I personally gave up on the streams >> >> DSL >> >>>>> for >> >>>>>>> some time already, otherwise I would have pulled this KIP through >> >>>>> already. >> >>>>>>> I am currently reimplementing my own DSL based on PAPI. >> >>>>>>> >> >>>>>>> >> >>>>>>>> I will look at finishing up my PR with the windowed state store >> in >> >> the >> >>>>>>>> next >> >>>>>>>> week or so, exercising it via tests, and then I will come back >> for >> >>>>> final >> >>>>>>>> discussions. In the meantime, I hope that any of the binding >> voters >> >>>>> could >> >>>>>>>> take a look at the KIP in the wiki. I have updated it according >> to >> >> the >> >>>>>>>> latest plan: >> >>>>>>>> >> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >> >>>>>>>> Support+non-key+joining+in+KTable >> >>>>>>>> >> >>>>>>>> I have also updated the KIP PR to use a windowed store. This >> could >> >> be >> >>>>>>>> replaced by the results of KIP-258 whenever they are completed. >> >>>>>>>> https://github.com/apache/kafka/pull/5527 >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> >> >>>>>>>> Adam >> >>>>>>>> >> >>>>>>> >> >>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated in >> the >> >>>>> PR? >> >>>>>>> expected it to change to Windowed<K>,Long Missing something? >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang < >> wangg...@gmail.com> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong link, >> as it >> >>>>> is >> >>>>>>>>> for >> >>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258 we do >> >>>>> want to >> >>>>>>>>> have "handling out-of-order data for source KTable" such that >> >>>>> instead of >> >>>>>>>>> blindly apply the updates to the materialized store, i.e. >> following >> >>>>>>>>> offset >> >>>>>>>>> ordering, we will reject updates that are older than the current >> >>>>> key's >> >>>>>>>>> timestamps, i.e. following timestamp ordering. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Guozhang >> >>>>>>>>> >> >>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang < >> >> wangg...@gmail.com> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hello Adam, >> >>>>>>>>>> >> >>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e. the >> >> high >> >>>>>>>>>> watermark store, now altered to be replaced with a window >> store), >> >> I >> >>>>>>>>>> think >> >>>>>>>>>> another current on-going KIP may actually help: >> >>>>>>>>>> >> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> This is for adding the timestamp into a key-value store (i.e. >> only >> >>>>> for >> >>>>>>>>>> non-windowed KTable), and then one of its usage, as described >> in >> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that we >> can >> >>>>> then >> >>>>>>>>>> "reject" updates from the source topics if its timestamp is >> >> smaller >> >>>>> than >> >>>>>>>>>> the current key's latest update timestamp. I think it is very >> >>>>> similar to >> >>>>>>>>>> what you have in mind for high watermark based filtering, while >> >> you >> >>>>> only >> >>>>>>>>>> need to make sure that the timestamps of the joining records >> are >> >>>>>>>>>> >> >>>>>>>>> correctly >> >>>>>>>>> >> >>>>>>>>>> inherited though the whole topology to the final stage. >> >>>>>>>>>> >> >>>>>>>>>> Note that this KIP is for key-value store and hence >> non-windowed >> >>>>> KTables >> >>>>>>>>>> only, but for windowed KTables we do not really have a good >> >> support >> >>>>> for >> >>>>>>>>>> their joins anyways ( >> >>>>> https://issues.apache.org/jira/browse/KAFKA-7107) >> >>>>>>>>>> I >> >>>>>>>>>> think we can just consider non-windowed KTable-KTable non-key >> >> joins >> >>>>> for >> >>>>>>>>>> now. In which case, KIP-258 should help. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Guozhang >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak < >> >>>>> jan.filip...@trivago.com >> >>>>>>>>>>> >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> Hi Guozhang >> >>>>>>>>>>>> >> >>>>>>>>>>>> Current highwater mark implementation would grow endlessly >> based >> >>>>> on >> >>>>>>>>>>>> primary key of original event. It is a pair of (<this table >> >>>>> primary >> >>>>>>>>>>>> >> >>>>>>>>>>> key>, >> >>>>>>>>> >> >>>>>>>>>> <highest offset seen for that key>). This is used to >> differentiate >> >>>>>>>>>>>> >> >>>>>>>>>>> between >> >>>>>>>>> >> >>>>>>>>>> late arrivals and new updates. My newest proposal would be to >> >>>>> replace >> >>>>>>>>>>>> >> >>>>>>>>>>> it >> >>>>>>>>> >> >>>>>>>>>> with a Windowed state store of Duration N. This would allow the >> >> same >> >>>>>>>>>>>> behaviour, but cap the size based on time. This should allow >> for >> >>>>> all >> >>>>>>>>>>>> late-arriving events to be processed, and should be >> customizable >> >>>>> by >> >>>>>>>>>>>> the >> >>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10 >> minutes >> >> of >> >>>>>>>>>>>> >> >>>>>>>>>>> window, >> >>>>>>>>> >> >>>>>>>>>> or perhaps 7 days...). >> >>>>>>>>>>>> >> >>>>>>>>>>>> Hi Adam, using time based retention can do the trick here. >> Even >> >>>>> if I >> >>>>>>>>>>> would still like to see the automatic repartitioning optional >> >>>>> since I >> >>>>>>>>>>> >> >>>>>>>>>> would >> >>>>>>>>> >> >>>>>>>>>> just reshuffle again. With windowed store I am a little bit >> >>>>> sceptical >> >>>>>>>>>>> >> >>>>>>>>>> about >> >>>>>>>>> >> >>>>>>>>>> how to determine the window. So esentially one could run into >> >>>>> problems >> >>>>>>>>>>> >> >>>>>>>>>> when >> >>>>>>>>> >> >>>>>>>>>> the rapid change happens near a window border. I will check you >> >>>>>>>>>>> implementation in detail, if its problematic, we could still >> >> check >> >>>>>>>>>>> _all_ >> >>>>>>>>>>> windows on read with not to bad performance impact I guess. >> Will >> >>>>> let >> >>>>>>>>>>> you >> >>>>>>>>>>> know if the implementation would be correct as is. I wouldn't >> not >> >>>>> like >> >>>>>>>>>>> >> >>>>>>>>>> to >> >>>>>>>>> >> >>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A) < >> >> timestamp(B). >> >>>>> I >> >>>>>>>>>>> >> >>>>>>>>>> think >> >>>>>>>>> >> >>>>>>>>>> we can't expect that. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> @Jan >> >>>>>>>>>>>> I believe I understand what you mean now - thanks for the >> >>>>> diagram, it >> >>>>>>>>>>>> did really help. You are correct that I do not have the >> original >> >>>>>>>>>>>> >> >>>>>>>>>>> primary >> >>>>>>>>> >> >>>>>>>>>> key available, and I can see that if it was available then you >> >>>>> would be >> >>>>>>>>>>>> able to add and remove events from the Map. That being said, >> I >> >>>>>>>>>>>> >> >>>>>>>>>>> encourage >> >>>>>>>>> >> >>>>>>>>>> you to finish your diagrams / charts just for clarity for >> everyone >> >>>>>>>>>>>> >> >>>>>>>>>>> else. >> >>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But I >> >>>>> understand >> >>>>>>>>>>>> >> >>>>>>>>>>> the benefits for the rest. Sorry about the original primary >> key, >> >> We >> >>>>>>>>>>> have >> >>>>>>>>>>> join and Group by implemented our own in PAPI and basically >> not >> >>>>> using >> >>>>>>>>>>> >> >>>>>>>>>> any >> >>>>>>>>> >> >>>>>>>>>> DSL (Just the abstraction). Completely missed that in original >> DSL >> >>>>> its >> >>>>>>>>>>> >> >>>>>>>>>> not >> >>>>>>>>> >> >>>>>>>>>> there and just assumed it. total brain mess up on my end. Will >> >>>>> finish >> >>>>>>>>>>> >> >>>>>>>>>> the >> >>>>>>>>> >> >>>>>>>>>> chart as soon as i get a quite evening this week. >> >>>>>>>>>>> >> >>>>>>>>>>> My follow up question for you is, won't the Map stay inside >> the >> >>>>> State >> >>>>>>>>>>> >> >>>>>>>>>>>> Store indefinitely after all of the changes have propagated? >> >> Isn't >> >>>>>>>>>>>> this >> >>>>>>>>>>>> effectively the same as a highwater mark state store? >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna >> return >> >>>>> `null` >> >>>>>>>>>>> >> >>>>>>>>>> and >> >>>>>>>>> >> >>>>>>>>>> the key is removed from the keyspace. But there is going to be >> a >> >>>>> store >> >>>>>>>>>>> 100%, the good thing is that I can use this store directly for >> >>>>>>>>>>> materialize() / enableSendingOldValues() is a regular store, >> >>>>> satisfying >> >>>>>>>>>>> all gurantees needed for further groupby / join. The Windowed >> >>>>> store is >> >>>>>>>>>>> >> >>>>>>>>>> not >> >>>>>>>>> >> >>>>>>>>>> keeping the values, so for the next statefull operation we >> would >> >>>>>>>>>>> need to instantiate an extra store. or we have the window >> store >> >>>>> also >> >>>>>>>>>>> >> >>>>>>>>>> have >> >>>>>>>>> >> >>>>>>>>>> the values then. >> >>>>>>>>>>> >> >>>>>>>>>>> Long story short. if we can flip in a custom group by before >> >>>>>>>>>>> repartitioning to the original primary key i think it would >> help >> >>>>> the >> >>>>>>>>>>> >> >>>>>>>>>> users >> >>>>>>>>> >> >>>>>>>>>> big time in building efficient apps. Given the original primary >> >> key >> >>>>>>>>>>> >> >>>>>>>>>> issue I >> >>>>>>>>> >> >>>>>>>>>> understand that we do not have a solid foundation to build on. >> >>>>>>>>>>> Leaving primary key carry along to the user. very >> unfortunate. I >> >>>>> could >> >>>>>>>>>>> understand the decision goes like that. I do not think its a >> good >> >>>>>>>>>>> >> >>>>>>>>>> decision. >> >>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks >> >>>>>>>>>>>> Adam >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre < >> >>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto: >> dumbreprajakta...@gmail.com >> >>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>>>> please remove me from this group >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak >> >>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >> >> jan.filip...@trivago.com >> >>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>> > Hi Adam, >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > give me some time, will make such a chart. last >> time i >> >>>>> didn't >> >>>>>>>>>>>> get along >> >>>>>>>>>>>> > well with giphy and ruined all your charts. >> >>>>>>>>>>>> > Hopefully i can get it done today >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > On 08.09.2018 16:00, Adam Bellemare wrote: >> >>>>>>>>>>>> > > Hi Jan >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > I have included a diagram of what I attempted on >> the >> >>>>> KIP. >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> >> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su >> >>>>>>>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining >> >>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate >> >>>>>>>>>>>> < >> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S >> >>>>>>>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin >> >>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > I attempted this back at the start of my own >> >>>>> implementation >> >>>>>>>>>>>> of >> >>>>>>>>>>>> this >> >>>>>>>>>>>> > > solution, and since I could not get it to work I >> have >> >>>>> since >> >>>>>>>>>>>> discarded the >> >>>>>>>>>>>> > > code. At this point in time, if you wish to >> continue >> >>>>> pursuing >> >>>>>>>>>>>> for your >> >>>>>>>>>>>> > > groupBy solution, I ask that you please create a >> >>>>> diagram on >> >>>>>>>>>>>> the KIP >> >>>>>>>>>>>> > > carefully explaining your solution. Please feel >> free >> >> to >> >>>>> use >> >>>>>>>>>>>> the image I >> >>>>>>>>>>>> > > just posted as a starting point. I am having >> trouble >> >>>>>>>>>>>> understanding your >> >>>>>>>>>>>> > > explanations but I think that a carefully >> constructed >> >>>>> diagram >> >>>>>>>>>>>> will clear >> >>>>>>>>>>>> > up >> >>>>>>>>>>>> > > any misunderstandings. Alternately, please post a >> >>>>>>>>>>>> comprehensive PR with >> >>>>>>>>>>>> > > your solution. I can only guess at what you >> mean, and >> >>>>> since I >> >>>>>>>>>>>> value my >> >>>>>>>>>>>> > own >> >>>>>>>>>>>> > > time as much as you value yours, I believe it is >> your >> >>>>>>>>>>>> responsibility to >> >>>>>>>>>>>> > > provide an implementation instead of me trying to >> >> guess. >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > Adam >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak >> >>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >> >> jan.filip...@trivago.com >> >>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> > > wrote: >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > >> Hi James, >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> nice to see you beeing interested. kafka >> streams at >> >>>>> this >> >>>>>>>>>>>> point supports >> >>>>>>>>>>>> > >> all sorts of joins as long as both streams have >> the >> >>>>> same >> >>>>>>>>>>>> key. >> >>>>>>>>>>>> > >> Adam is currently implementing a join where a >> KTable >> >>>>> and a >> >>>>>>>>>>>> KTable can >> >>>>>>>>>>>> > have >> >>>>>>>>>>>> > >> a one to many relation ship (1:n). We exploit >> that >> >>>>> rocksdb >> >>>>>>>>>>>> is >> >>>>>>>>>>>> >> >>>>>>>>>>> a >> >>>>>>>>> >> >>>>>>>>>> > >> datastore that keeps data sorted (At least >> exposes an >> >>>>> API to >> >>>>>>>>>>>> access the >> >>>>>>>>>>>> > >> stored data in a sorted fashion). >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> I think the technical caveats are well >> understood >> >> now >> >>>>> and we >> >>>>>>>>>>>> >> >>>>>>>>>>> are >> >>>>>>>>> >> >>>>>>>>>> > basically >> >>>>>>>>>>>> > >> down to philosophy and API Design ( when Adam >> sees >> >> my >> >>>>> newest >> >>>>>>>>>>>> message). >> >>>>>>>>>>>> > >> I have a lengthy track record of loosing those >> kinda >> >>>>>>>>>>>> arguments within >> >>>>>>>>>>>> > the >> >>>>>>>>>>>> > >> streams community and I have no clue why. So I >> >>>>> literally >> >>>>>>>>>>>> can't wait for >> >>>>>>>>>>>> > you >> >>>>>>>>>>>> > >> to churn through this thread and give you >> opinion on >> >>>>> how we >> >>>>>>>>>>>> should >> >>>>>>>>>>>> > design >> >>>>>>>>>>>> > >> the return type of the oneToManyJoin and how >> many >> >>>>> power we >> >>>>>>>>>>>> want to give >> >>>>>>>>>>>> > to >> >>>>>>>>>>>> > >> the user vs "simplicity" (where simplicity isn't >> >>>>> really that >> >>>>>>>>>>>> as users >> >>>>>>>>>>>> > still >> >>>>>>>>>>>> > >> need to understand it I argue) >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> waiting for you to join in on the discussion >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> Best Jan >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> On 07.09.2018 15:49, James Kwan wrote: >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >>> I am new to this group and I found this subject >> >>>>>>>>>>>> interesting. Sounds >> >>>>>>>>>>>> > like >> >>>>>>>>>>>> > >>> you guys want to implement a join table of two >> >>>>> streams? Is >> >>>>>>>>>>>> there >> >>>>>>>>>>>> > somewhere >> >>>>>>>>>>>> > >>> I can see the original requirement or proposal? >> >>>>>>>>>>>> > >>> >> >>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak >> >>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >> >> jan.filip...@trivago.com >> >>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> > >>>> wrote: >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> On 05.09.2018 22:17, Adam Bellemare wrote: >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>>> I'm currently testing using a Windowed Store >> to >> >>>>> store the >> >>>>>>>>>>>> highwater >> >>>>>>>>>>>> > >>>>> mark. >> >>>>>>>>>>>> > >>>>> By all indications this should work fine, >> with >> >> the >> >>>>> caveat >> >>>>>>>>>>>> being that >> >>>>>>>>>>>> > it >> >>>>>>>>>>>> > >>>>> can >> >>>>>>>>>>>> > >>>>> only resolve out-of-order arrival for up to >> the >> >>>>> size of >> >>>>>>>>>>>> the window >> >>>>>>>>>>>> > (ie: >> >>>>>>>>>>>> > >>>>> 24h, 72h, etc). This would remove the >> possibility >> >>>>> of it >> >>>>>>>>>>>> >> >>>>>>>>>>> being >> >>>>>>>>> >> >>>>>>>>>> > unbounded >> >>>>>>>>>>>> > >>>>> in >> >>>>>>>>>>>> > >>>>> size. >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> With regards to Jan's suggestion, I believe >> this >> >> is >> >>>>> where >> >>>>>>>>>>>> we will >> >>>>>>>>>>>> > have >> >>>>>>>>>>>> > >>>>> to >> >>>>>>>>>>>> > >>>>> remain in disagreement. While I do not >> disagree >> >>>>> with your >> >>>>>>>>>>>> statement >> >>>>>>>>>>>> > >>>>> about >> >>>>>>>>>>>> > >>>>> there likely to be additional joins done in a >> >>>>> real-world >> >>>>>>>>>>>> workflow, I >> >>>>>>>>>>>> > do >> >>>>>>>>>>>> > >>>>> not >> >>>>>>>>>>>> > >>>>> see how you can conclusively deal with >> >> out-of-order >> >>>>>>>>>>>> arrival >> >>>>>>>>>>>> of >> >>>>>>>>>>>> > >>>>> foreign-key >> >>>>>>>>>>>> > >>>>> changes and subsequent joins. I have >> attempted >> >> what >> >>>>> I >> >>>>>>>>>>>> think you have >> >>>>>>>>>>>> > >>>>> proposed (without a high-water, using >> groupBy and >> >>>>> reduce) >> >>>>>>>>>>>> and found >> >>>>>>>>>>>> > >>>>> that if >> >>>>>>>>>>>> > >>>>> the foreign key changes too quickly, or the >> load >> >> on >> >>>>> a >> >>>>>>>>>>>> stream thread >> >>>>>>>>>>>> > is >> >>>>>>>>>>>> > >>>>> too >> >>>>>>>>>>>> > >>>>> high, the joined messages will arrive >> >> out-of-order >> >>>>> and be >> >>>>>>>>>>>> incorrectly >> >>>>>>>>>>>> > >>>>> propagated, such that an intermediate event >> is >> >>>>>>>>>>>> represented >> >>>>>>>>>>>> as the >> >>>>>>>>>>>> > final >> >>>>>>>>>>>> > >>>>> event. >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>> Can you shed some light on your groupBy >> >>>>> implementation. >> >>>>>>>>>>>> There must be >> >>>>>>>>>>>> > >>>> some sort of flaw in it. >> >>>>>>>>>>>> > >>>> I have a suspicion where it is, I would just >> like >> >> to >> >>>>>>>>>>>> confirm. The idea >> >>>>>>>>>>>> > >>>> is bullet proof and it must be >> >>>>>>>>>>>> > >>>> an implementation mess up. I would like to >> clarify >> >>>>> before >> >>>>>>>>>>>> we draw a >> >>>>>>>>>>>> > >>>> conclusion. >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> Repartitioning the scattered events back to >> >> their >> >>>>>>>>>>>> >> >>>>>>>>>>> original >> >>>>>>>>> >> >>>>>>>>>> > >>>>> partitions is the only way I know how to >> >> conclusively >> >>>>> deal >> >>>>>>>>>>>> with >> >>>>>>>>>>>> > >>>>> out-of-order events in a given time frame, >> and to >> >>>>> ensure >> >>>>>>>>>>>> that the >> >>>>>>>>>>>> > data >> >>>>>>>>>>>> > >>>>> is >> >>>>>>>>>>>> > >>>>> eventually consistent with the input events. >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> If you have some code to share that >> illustrates >> >> your >> >>>>>>>>>>>> approach, I >> >>>>>>>>>>>> > would >> >>>>>>>>>>>> > >>>>> be >> >>>>>>>>>>>> > >>>>> very grateful as it would remove any >> >>>>> misunderstandings >> >>>>>>>>>>>> that I may >> >>>>>>>>>>>> > have. >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>> ah okay you were looking for my code. I don't >> have >> >>>>>>>>>>>> something easily >> >>>>>>>>>>>> > >>>> readable here as its bloated with OO-patterns. >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> its anyhow trivial: >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> @Override >> >>>>>>>>>>>> > >>>> public T apply(K aggKey, V value, T >> >> aggregate) >> >>>>>>>>>>>> > >>>> { >> >>>>>>>>>>>> > >>>> Map<U, V> currentStateAsMap = >> >>>>> asMap(aggregate); >> >>>>>>>>>>>> << >> >>>>>>>>>>>> imaginary >> >>>>>>>>>>>> > >>>> U toModifyKey = mapper.apply(value); >> >>>>>>>>>>>> > >>>> << this is the place where people >> >>>>> actually >> >>>>>>>>>>>> gonna have >> >>>>>>>>>>>> > issues >> >>>>>>>>>>>> > >>>> and why you probably couldn't do it. we would >> need >> >>>>> to find >> >>>>>>>>>>>> a solution >> >>>>>>>>>>>> > here. >> >>>>>>>>>>>> > >>>> I didn't realize that yet. >> >>>>>>>>>>>> > >>>> << we propagate the field in the >> >>>>> joiner, so >> >>>>>>>>>>>> that we can >> >>>>>>>>>>>> > pick >> >>>>>>>>>>>> > >>>> it up in an aggregate. Probably you have not >> >> thought >> >>>>> of >> >>>>>>>>>>>> this in your >> >>>>>>>>>>>> > >>>> approach right? >> >>>>>>>>>>>> > >>>> << I am very open to find a >> generic >> >>>>> solution >> >>>>>>>>>>>> here. In my >> >>>>>>>>>>>> > >>>> honest opinion this is broken in >> >> KTableImpl.GroupBy >> >>>>> that >> >>>>>>>>>>>> it >> >>>>>>>>>>>> looses >> >>>>>>>>>>>> > the keys >> >>>>>>>>>>>> > >>>> and only maintains the aggregate key. >> >>>>>>>>>>>> > >>>> << I abstracted it away back >> then way >> >>>>> before >> >>>>>>>>>>>> i >> >>>>>>>>>>>> was >> >>>>>>>>>>>> > thinking >> >>>>>>>>>>>> > >>>> of oneToMany join. That is why I didn't >> realize >> >> its >> >>>>>>>>>>>> significance here. >> >>>>>>>>>>>> > >>>> << Opinions? >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> for (V m : current) >> >>>>>>>>>>>> > >>>> { >> >>>>>>>>>>>> > >>>> currentStateAsMap.put(mapper.apply(m), m); >> >>>>>>>>>>>> > >>>> } >> >>>>>>>>>>>> > >>>> if (isAdder) >> >>>>>>>>>>>> > >>>> { >> >>>>>>>>>>>> > >>>> currentStateAsMap.put(toModifyKey, value); >> >>>>>>>>>>>> > >>>> } >> >>>>>>>>>>>> > >>>> else >> >>>>>>>>>>>> > >>>> { >> >>>>>>>>>>>> > >>>> currentStateAsMap.remove(toModifyKey); >> >>>>>>>>>>>> > >>>> if(currentStateAsMap.isEmpty()){ >> >>>>>>>>>>>> > >>>> return null; >> >>>>>>>>>>>> > >>>> } >> >>>>>>>>>>>> > >>>> } >> >>>>>>>>>>>> > >>>> retrun >> asAggregateType(currentStateAsMap) >> >>>>>>>>>>>> > >>>> } >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> >> >>>>>>>>>>>> > >>>> Thanks, >> >>>>>>>>>>>> > >>>>> Adam >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak >> < >> >>>>>>>>>>>> > jan.filip...@trivago.com <mailto: >> >> jan.filip...@trivago.com >> >>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> > >>>>> wrote: >> >>>>>>>>>>>> > >>>>> >> >>>>>>>>>>>> > >>>>> Thanks Adam for bringing Matthias to speed! >> >>>>>>>>>>>> > >>>>>> about the differences. I think re-keying >> back >> >>>>> should be >> >>>>>>>>>>>> optional at >> >>>>>>>>>>>> > >>>>>> best. >> >>>>>>>>>>>> > >>>>>> I would say we return a KScatteredTable with >> >>>>> reshuffle() >> >>>>>>>>>>>> returning >> >>>>>>>>>>>> > >>>>>> KTable<originalKey,Joined> to make the >> backwards >> >>>>>>>>>>>> repartitioning >> >>>>>>>>>>>> > >>>>>> optional. >> >>>>>>>>>>>> > >>>>>> I am also in a big favour of doing the out >> of >> >> order >> >>>>>>>>>>>> processing using >> >>>>>>>>>>>> > >>>>>> group >> >>>>>>>>>>>> > >>>>>> by instead high water mark tracking. >> >>>>>>>>>>>> > >>>>>> Just because unbounded growth is just scary >> + It >> >>>>> saves >> >>>>>>>>>>>> us >> >>>>>>>>>>>> the header >> >>>>>>>>>>>> > >>>>>> stuff. >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> I think the abstraction of always >> repartitioning >> >>>>> back is >> >>>>>>>>>>>> just not so >> >>>>>>>>>>>> > >>>>>> strong. Like the work has been done before >> we >> >>>>> partition >> >>>>>>>>>>>> back and >> >>>>>>>>>>>> > >>>>>> grouping >> >>>>>>>>>>>> > >>>>>> by something else afterwards is really >> common. >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote: >> >>>>>>>>>>>> > >>>>>> >> >>>>>>>>>>>> > >>>>>> Hi Matthias >> >>>>>>>>>>>> > >>>>>>> Thank you for your feedback, I do >> appreciate >> >> it! >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> While name spacing would be possible, it >> would >> >>>>> require >> >>>>>>>>>>>> to >> >>>>>>>>>>>> > deserialize >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>>> user headers what implies a runtime >> overhead. >> >> I >> >>>>> would >> >>>>>>>>>>>> suggest to >> >>>>>>>>>>>> > no >> >>>>>>>>>>>> > >>>>>>>> namespace for now to avoid the overhead. >> If >> >> this >> >>>>>>>>>>>> >> >>>>>>>>>>> becomes a >> >>>>>>>>> >> >>>>>>>>>> > problem in >> >>>>>>>>>>>> > >>>>>>>> the future, we can still add name spacing >> >> later >> >>>>> on. >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Agreed. I will go with using a reserved >> string >> >>>>> and >> >>>>>>>>>>>> document it. >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> My main concern about the design it the >> type of >> >>>>> the >> >>>>>>>>>>>> result KTable: >> >>>>>>>>>>>> > If >> >>>>>>>>>>>> > >>>>>>> I >> >>>>>>>>>>>> > >>>>>>> understood the proposal correctly, >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> In your example, you have table1 and table2 >> >>>>> swapped. >> >>>>>>>>>>>> Here is how it >> >>>>>>>>>>>> > >>>>>>> works >> >>>>>>>>>>>> > >>>>>>> currently: >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> 1) table1 has the records that contain the >> >>>>> foreign key >> >>>>>>>>>>>> within their >> >>>>>>>>>>>> > >>>>>>> value. >> >>>>>>>>>>>> > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, >> >>>>>>>>>>>> <b,(fk=A,bar=2)>, >> >>>>>>>>>>>> > >>>>>>> <c,(fk=B,bar=3)> >> >>>>>>>>>>>> > >>>>>>> table2 input stream: <A,X>, <B,Y> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> 2) A Value mapper is required to extract >> the >> >>>>> foreign >> >>>>>>>>>>>> key. >> >>>>>>>>>>>> > >>>>>>> table1 foreign key mapper: ( value => >> value.fk >> >>>>>>>>>>>> <http://value.fk> ) >> >>>>>>>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> The mapper is applied to each element in >> >> table1, >> >>>>> and a >> >>>>>>>>>>>> new combined >> >>>>>>>>>>>> > >>>>>>> key is >> >>>>>>>>>>>> > >>>>>>> made: >> >>>>>>>>>>>> > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, >> >>>>>>>>>>>> (fk=A,bar=2)>, >> >>>>>>>>>>>> <B-c, >> >>>>>>>>>>>> > >>>>>>> (fk=B,bar=3)> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> 3) The rekeyed events are copartitioned >> with >> >>>>> table2: >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> a) Stream Thread with Partition 0: >> >>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, >> <A-b, >> >>>>>>>>>>>> (fk=A,bar=2)> >> >>>>>>>>>>>> > >>>>>>> Table2: <A,X> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> b) Stream Thread with Partition 1: >> >>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> >> >>>>>>>>>>>> > >>>>>>> Table2: <B,Y> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> 4) From here, they can be joined together >> >> locally >> >>>>> by >> >>>>>>>>>>>> applying the >> >>>>>>>>>>>> > >>>>>>> joiner >> >>>>>>>>>>>> > >>>>>>> function. >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> At this point, Jan's design and my design >> >>>>> deviate. My >> >>>>>>>>>>>> design goes >> >>>>>>>>>>>> > on >> >>>>>>>>>>>> > >>>>>>> to >> >>>>>>>>>>>> > >>>>>>> repartition the data post-join and resolve >> >>>>> out-of-order >> >>>>>>>>>>>> arrival of >> >>>>>>>>>>>> > >>>>>>> records, >> >>>>>>>>>>>> > >>>>>>> finally returning the data keyed just the >> >>>>> original key. >> >>>>>>>>>>>> I do not >> >>>>>>>>>>>> > >>>>>>> expose >> >>>>>>>>>>>> > >>>>>>> the >> >>>>>>>>>>>> > >>>>>>> CombinedKey or any of the internals >> outside of >> >> the >> >>>>>>>>>>>> joinOnForeignKey >> >>>>>>>>>>>> > >>>>>>> function. This does make for larger >> footprint, >> >>>>> but it >> >>>>>>>>>>>> removes all >> >>>>>>>>>>>> > >>>>>>> agency >> >>>>>>>>>>>> > >>>>>>> for resolving out-of-order arrivals and >> >> handling >> >>>>>>>>>>>> CombinedKeys from >> >>>>>>>>>>>> > the >> >>>>>>>>>>>> > >>>>>>> user. I believe that this makes the >> function >> >> much >> >>>>>>>>>>>> easier >> >>>>>>>>>>>> to use. >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> Let me know if this helps resolve your >> >> questions, >> >>>>> and >> >>>>>>>>>>>> please feel >> >>>>>>>>>>>> > >>>>>>> free to >> >>>>>>>>>>>> > >>>>>>> add anything else on your mind. >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> Thanks again, >> >>>>>>>>>>>> > >>>>>>> Adam >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. >> >> Sax < >> >>>>>>>>>>>> > >>>>>>> matth...@confluent.io <mailto: >> >>>>> matth...@confluent.io>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> > >>>>>>> wrote: >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>> Hi, >> >>>>>>>>>>>> > >>>>>>> >> >>>>>>>>>>>> > >>>>>>>> I am just catching up on this thread. I >> did >> >> not >> >>>>> read >> >>>>>>>>>>>> everything so >> >>>>>>>>>>>> > >>>>>>>> far, >> >>>>>>>>>>>> > >>>>>>>> but want to share couple of initial >> thoughts: >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Headers: I think there is a fundamental >> >>>>> difference >> >>>>>>>>>>>> between header >> >>>>>>>>>>>> > >>>>>>>> usage >> >>>>>>>>>>>> > >>>>>>>> in this KIP and KP-258. For 258, we add >> >> headers >> >>>>> to >> >>>>>>>>>>>> changelog topic >> >>>>>>>>>>>> > >>>>>>>> that >> >>>>>>>>>>>> > >>>>>>>> are owned by Kafka Streams and nobody >> else is >> >>>>> supposed >> >>>>>>>>>>>> to write >> >>>>>>>>>>>> > into >> >>>>>>>>>>>> > >>>>>>>> them. In fact, no user header are written >> into >> >>>>> the >> >>>>>>>>>>>> changelog topic >> >>>>>>>>>>>> > >>>>>>>> and >> >>>>>>>>>>>> > >>>>>>>> thus, there are not conflicts. >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Nevertheless, I don't see a big issue with >> >> using >> >>>>>>>>>>>> headers within >> >>>>>>>>>>>> > >>>>>>>> Streams. >> >>>>>>>>>>>> > >>>>>>>> As long as we document it, we can have >> some >> >>>>> "reserved" >> >>>>>>>>>>>> header keys >> >>>>>>>>>>>> > >>>>>>>> and >> >>>>>>>>>>>> > >>>>>>>> users are not allowed to use when >> processing >> >>>>> data with >> >>>>>>>>>>>> Kafka >> >>>>>>>>>>>> > Streams. >> >>>>>>>>>>>> > >>>>>>>> IMHO, this should be ok. >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> I think there is a safe way to avoid >> >> conflicts, >> >>>>> since >> >>>>>>>>>>>> these >> >>>>>>>>>>>> > headers >> >>>>>>>>>>>> > >>>>>>>> are >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>>> only needed in internal topics (I think): >> >>>>>>>>>>>> > >>>>>>>>> For internal and changelog topics, we can >> >>>>> namespace >> >>>>>>>>>>>> all headers: >> >>>>>>>>>>>> > >>>>>>>>> * user-defined headers are namespaced as >> >>>>> "external." >> >>>>>>>>>>>> + >> >>>>>>>>>>>> headerKey >> >>>>>>>>>>>> > >>>>>>>>> * internal headers are namespaced as >> >>>>> "internal." + >> >>>>>>>>>>>> headerKey >> >>>>>>>>>>>> > >>>>>>>>> >> >>>>>>>>>>>> > >>>>>>>>> While name spacing would be possible, it >> >> would >> >>>>>>>>>>>> require >> >>>>>>>>>>>> >> >>>>>>>>>>> to >> >>>>>>>>> >> >>>>>>>>>> > >>>>>>>> deserialize >> >>>>>>>>>>>> > >>>>>>>> user headers what implies a runtime >> overhead. >> >> I >> >>>>> would >> >>>>>>>>>>>> suggest to >> >>>>>>>>>>>> > no >> >>>>>>>>>>>> > >>>>>>>> namespace for now to avoid the overhead. >> If >> >> this >> >>>>>>>>>>>> >> >>>>>>>>>>> becomes a >> >>>>>>>>> >> >>>>>>>>>> > problem in >> >>>>>>>>>>>> > >>>>>>>> the future, we can still add name spacing >> >> later >> >>>>> on. >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> My main concern about the design it the >> type >> >> of >> >>>>> the >> >>>>>>>>>>>> result KTable: >> >>>>>>>>>>>> > >>>>>>>> If I >> >>>>>>>>>>>> > >>>>>>>> understood the proposal correctly, >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> KTable<K1,V1> table1 = ... >> >>>>>>>>>>>> > >>>>>>>> KTable<K2,V2> table2 = ... >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> KTable<K1,V3> joinedTable = >> >>>>> table1.join(table2,...); >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> implies that the `joinedTable` has the >> same >> >> key >> >>>>> as the >> >>>>>>>>>>>> left input >> >>>>>>>>>>>> > >>>>>>>> table. >> >>>>>>>>>>>> > >>>>>>>> IMHO, this does not work because if table2 >> >>>>> contains >> >>>>>>>>>>>> multiple rows >> >>>>>>>>>>>> > >>>>>>>> that >> >>>>>>>>>>>> > >>>>>>>> join with a record in table1 (what is the >> main >> >>>>> purpose >> >>>>>>>>>>>> >> >>>>>>>>>>> of >> >>>>>>>>> >> >>>>>>>>>> a >> >>>>>>>>>>>> > foreign >> >>>>>>>>>>>> > >>>>>>>> key >> >>>>>>>>>>>> > >>>>>>>> join), the result table would only >> contain a >> >>>>> single >> >>>>>>>>>>>> join result, >> >>>>>>>>>>>> > but >> >>>>>>>>>>>> > >>>>>>>> not >> >>>>>>>>>>>> > >>>>>>>> multiple. >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Example: >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> table1 input stream: <A,X> >> >>>>>>>>>>>> > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> We use table2 value a foreign key to >> table1 >> >> key >> >>>>> (ie, >> >>>>>>>>>>>> "A" joins). >> >>>>>>>>>>>> > If >> >>>>>>>>>>>> > >>>>>>>> the >> >>>>>>>>>>>> > >>>>>>>> result key is the same key as key of >> table1, >> >> this >> >>>>>>>>>>>> implies that the >> >>>>>>>>>>>> > >>>>>>>> result can either be <A, join(X,1)> or <A, >> >>>>> join(X,2)> >> >>>>>>>>>>>> but not >> >>>>>>>>>>>> > both. >> >>>>>>>>>>>> > >>>>>>>> Because the share the same key, whatever >> >> result >> >>>>> record >> >>>>>>>>>>>> we emit >> >>>>>>>>>>>> > later, >> >>>>>>>>>>>> > >>>>>>>> overwrite the previous result. >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> This is the reason why Jan originally >> proposed >> >>>>> to use >> >>>>>>>>>>>> a >> >>>>>>>>>>>> > combination >> >>>>>>>>>>>> > >>>>>>>> of >> >>>>>>>>>>>> > >>>>>>>> both primary keys of the input tables as >> key >> >> of >> >>>>> the >> >>>>>>>>>>>> output table. >> >>>>>>>>>>>> > >>>>>>>> This >> >>>>>>>>>>>> > >>>>>>>> makes the keys of the output table unique >> and >> >> we >> >>>>> can >> >>>>>>>>>>>> store both in >> >>>>>>>>>>>> > >>>>>>>> the >> >>>>>>>>>>>> > >>>>>>>> output table: >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, >> >>>>> join(X,2)> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Thoughts? >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> -Matthias >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: >> >>>>>>>>>>>> > >>>>>>>> >> >>>>>>>>>>>> > >>>>>>>> Just on remark here. >> >>>>>>>>>>>> > >>>>>>>>> The high-watermark could be disregarded. >> The >> >>>>> decision >> >>>>>>>>>>>> about the >> >>>>>>>>>>>> > >>>>>>>>> forward >> >>>>>>>>>>>> > >>>>>>>>> depends on the size of the aggregated >> map. >> >>>>>>>>>>>> > >>>>>>>>> Only 1 element long maps would be >> unpacked >> >> and >> >>>>>>>>>>>> forwarded. 0 >> >>>>>>>>>>>> > element >> >>>>>>>>>>>> > >>>>>>>>> maps >> >>>>>>>>>>>> > >>>>>>>>> would be published as delete. Any other >> count >> >>>>>>>>>>>> > >>>>>>>>> of map entries is in "waiting for correct >> >>>>> deletes to >> >>>>>>>>>>>> > arrive"-state. >> >>>>>>>>>>>> > >>>>>>>>> >> >>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare >> wrote: >> >>>>>>>>>>>> > >>>>>>>>> >> >>>>>>>>>>>> > >>>>>>>>> It does look like I could replace the >> second >> >>>>>>>>>>>> repartition store >> >>>>>>>>>>>> > and >> >>>>>>>>>>>> > >>>>>>>>>> highwater store with a groupBy and >> reduce. >> >>>>> However, >> >>>>>>>>>>>> it looks >> >>>>>>>>>>>> > like >> >>>>>>>>>>>> > >>>>>>>>>> I >> >>>>>>>>>>>> > >>>>>>>>>> would >> >>>>>>>>>>>> > >>>>>>>>>> still need to store the highwater value >> >> within >> >>>>> the >> >>>>>>>>>>>> materialized >> >>>>>>>>>>>> > >>>>>>>>>> store, >> >>>>>>>>>>>> > >>>>>>>>>> >> >>>>>>>>>>>> > >>>>>>>>>> to >> >>>>>>>>>>>> > >>>>>>>>> compare the arrival of out-of-order >> records >> >>>>> (assuming >> >>>>>>>>>>>> >> >>>>>>>>>>> my >> >>>>>>>>> >> >>>>>>>>>> > >>>>>>>>> understanding >> >>>>>>>>>>>> > >>>>>>>>> of >> >>>>>>>>>>>> > >>>>>>>>> THIS is correct...). This in effect is >> the >> >> same >> >>>>> as >> >>>>>>>>>>>> the >> >>>>>>>>>>>> design I >> >>>>>>>>>>>> > have >> >>>>>>>>>>>> > >>>>>>>>> now, >> >>>>>>>>>>>> > >>>>>>>>> just with the two tables merged together. >> >>>>>>>>>>>> > >>>>>>>>> >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> -- >> >>>>>>>>>> -- Guozhang >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> -- >> >>>>>>>>> -- Guozhang >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >