Yep, I definitely misunderstood some of the groupBy and groupByKey functionality. I would say disregard what I said in my previous email w.r.t. my assumptions about record size. I was looking into the code more today and I did not understand it correctly the first time I read it.
It does look like I could replace the second repartition store and highwater store with a groupBy and reduce. However, it looks like I would still need to store the highwater value within the materialized store, to compare the arrival of out-of-order records (assuming my understanding of THIS is correct...). This in effect is the same as the design I have now, just with the two tables merged together. I will keep looking at this but I am not seeing a great simplification. Advice and comments are welcomed as always. On Tue, Sep 4, 2018 at 9:38 AM, Adam Bellemare <adam.bellem...@gmail.com> wrote: > > As I was looking more into RocksDB TTL, I see that we currently do not > support it in Kafka Streams due to a number of technical reasons. As I > don't think that I will be tackling that JIRA at the moment, the current > implementation is indeed unbounded in the highwater table growth. > > An alternate option may be to replace the highwater mark table with a > groupBy and then perform a reduce/aggregate. My main concern here is that > technically we could have an unbounded amount of data to group together by > key, and the grouped size could exceed the Kafka maximum record size. When > I built the highwater mark table my intent was to work around this > possibility, as each record is evaluated independently and record sizing > issues do not come into play. If I am incorrect in this assumption, please > correct me, because I am a bit fuzzy on exactly how the groupBy currently > works. > > Any thoughts on this are appreciated. I will revisit it again when I have > a bit more time. > > Thanks > > > > On Mon, Sep 3, 2018 at 4:55 PM, Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Hi Jan >> >> Thank you for taking the time to look into my PR. I have updated it >> accordingly along with the suggestions from John. Please note that I am by >> no means an expert on Java, so I really do appreciate any Java-specific >> feedback you may have. Do not worry about being overly verbose on it. >> >> You are correct with regards to the highwater mark growing unbounded. One >> option would be to implement the rocksDB TTL to expire records. I am open >> to other options as well. >> >> I have tried to detail the reasoning behind it in the KIP - I have added >> additional comments and I hope that it is clearer now. >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >> Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjo >> ininginKTable-MultipleRapidForeign-KeyValueChanges-Whyahighw >> atertableisrequired. >> >> Please keep in mind that there may be something about ordering guarantees >> that I am not aware of. As far as I know, once you begin to operate on >> events in parallel across different nodes within the processor API, there >> are no ordering guarantees and everything is simple first-come, >> first-served(processed). If this is not the case then I am unaware of that >> fact. >> >> >> >> Thanks >> >> Adam >> >> >> >> >> >> On Mon, Sep 3, 2018 at 8:38 AM, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >>> Finished my deeper scan on your approach. >>> Most of the comments I put at the PR are minor code style things. >>> One forward call seems to be a bug though, would be great if you could >>> double check. >>> >>> the one problem I see is that the high watermark store grows unbounded. >>> A key being deleted from the source table does not lead to deletion in >>> the watermark store. >>> >>> I also don't quite grasp the concept why it's needed. I think the whole >>> offset part can go away? >>> It seems to deal with node failures of some kind but everything should >>> turn out okay without it? >>> >>> Best Jan >>> >>> >>> On 01.09.2018 20:44, Guozhang Wang wrote: >>> >>>> Yes Adam, that makes sense. >>>> >>>> I think it may be better to have a working PR to review before we >>>> complete >>>> the VOTE thread. In my previous experience a large feature like this are >>>> mostly definitely going to miss some devils in the details in the design >>>> and wiki discussion phases. >>>> >>>> That would unfortunately mean that your implementations may need to be >>>> modified / updated along with the review and further KIP discussion. I >>>> can >>>> understand this can be painful, but that may be the best option we can >>>> do >>>> to avoid as much work to be wasted as possible. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Wed, Aug 29, 2018 at 10:06 AM, Adam Bellemare < >>>> adam.bellem...@gmail.com> >>>> wrote: >>>> >>>> Hi Guozhang >>>>> >>>>> By workflow I mean just the overall process of how the KIP is >>>>> implemented. >>>>> Any ideas on the ways to reduce the topic count, materializations, if >>>>> there >>>>> is a better way to resolve out-of-order than a highwater mark table, >>>>> if the >>>>> design philosophy of “keep everything encapsulated within the join >>>>> function” is appropriate, etc. I can implement the changes that John >>>>> suggested, but if my overall workflow is not acceptable I would rather >>>>> address that before making minor changes. >>>>> >>>>> If this requires a full candidate PR ready to go to prod then I can >>>>> make >>>>> those changes. Hope that clears things up. >>>>> >>>>> Thanks >>>>> >>>>> Adam >>>>> >>>>> On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi Adam, >>>>>> >>>>>> What do you mean by "additional comments on the workflow.", do you >>>>>> mean >>>>>> >>>>> to >>>>> >>>>>> let other review your PR https://github.com/apache/kafka/pull/5527 ? >>>>>> Is >>>>>> >>>>> is >>>>> >>>>>> ready for reviews? >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare < >>>>>> >>>>> adam.bellem...@gmail.com> >>>>> >>>>>> wrote: >>>>>> >>>>>> Okay, I will implement John's suggestion of namespacing the external >>>>>>> headers prior to processing, and then removing the namespacing prior >>>>>>> to >>>>>>> emitting. A potential future KIP could be to provide this namespacing >>>>>>> automatically. >>>>>>> >>>>>>> I would also appreciate any other additional comments on the >>>>>>> workflow. >>>>>>> >>>>>> My >>>>> >>>>>> goal is suss out agreement prior to moving to a vote. >>>>>>> >>>>>>> On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangg...@gmail.com> >>>>>>>> >>>>>>> wrote: >>>>> >>>>>> I like John's idea as well: for this KIP specifically as we do not >>>>>>>> >>>>>>> expect >>>>> >>>>>> any other consumers to read the repartition topics externally, we can >>>>>>>> slightly prefix the header to be safe, while keeping the additional >>>>>>>> >>>>>>> cost >>>>> >>>>>> (note the header field is per-record, so any additional byte is >>>>>>>> >>>>>>> per-record >>>>>>> >>>>>>>> as well) low. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare < >>>>>>>> >>>>>>> adam.bellem...@gmail.com >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi John >>>>>>>>> >>>>>>>>> That is an excellent idea. The header usage I propose would be >>>>>>>>> limited >>>>>>>>> entirely to internal topics, and this could very well be the >>>>>>>>> solution >>>>>>>>> >>>>>>>> to >>>>>>> >>>>>>>> potential conflicts. If we do not officially reserve a prefix "__" >>>>>>>>> >>>>>>>> then I >>>>>>> >>>>>>>> think this would be the safest idea, as it would entirely avoid any >>>>>>>>> accidents (perhaps if a company is using its own "__" prefix for >>>>>>>>> other >>>>>>>>> reasons). >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> Adam >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <j...@confluent.io> >>>>>>>>> >>>>>>>> wrote: >>>>>>> >>>>>>>> Just a quick thought regarding headers: >>>>>>>>>> >>>>>>>>>>> I think there is no absolute-safe ways to avoid conflicts, but we >>>>>>>>>>> >>>>>>>>>> can >>>>>>> >>>>>>>> still >>>>>>>>>> >>>>>>>>>>> consider using some name patterns to reduce the likelihood as >>>>>>>>>>> much >>>>>>>>>>> >>>>>>>>>> as >>>>>>> >>>>>>>> possible.. e.g. consider sth. like the internal topics naming: e.g. >>>>>>>>>>> "__internal_[name]"? >>>>>>>>>>> >>>>>>>>>> I think there is a safe way to avoid conflicts, since these >>>>>>>>>> headers >>>>>>>>>> >>>>>>>>> are >>>>>>> >>>>>>>> only needed in internal topics (I think): >>>>>>>>>> For internal and changelog topics, we can namespace all headers: >>>>>>>>>> * user-defined headers are namespaced as "external." + headerKey >>>>>>>>>> * internal headers are namespaced as "internal." + headerKey >>>>>>>>>> >>>>>>>>>> This is a lot of characters, so we could use a sigil instead >>>>>>>>>> (e.g., >>>>>>>>>> >>>>>>>>> "_" >>>>>>> >>>>>>>> for >>>>>>>>> >>>>>>>>>> internal, "~" for external) >>>>>>>>>> >>>>>>>>>> We simply apply the namespacing when we read user headers from >>>>>>>>>> >>>>>>>>> external >>>>>>> >>>>>>>> topics into the topology and then de-namespace them before we emit >>>>>>>>>> >>>>>>>>> them >>>>>>> >>>>>>>> to >>>>>>>>> >>>>>>>>>> an external topic (via "to" or "through"). >>>>>>>>>> Now, it is not possible to collide with user-defined headers. >>>>>>>>>> >>>>>>>>>> That said, I'd also be fine with just reserving "__" as a header >>>>>>>>>> >>>>>>>>> prefix >>>>>>> >>>>>>>> and >>>>>>>>> >>>>>>>>>> not worrying about collisions. >>>>>>>>>> >>>>>>>>>> Thanks for the KIP, >>>>>>>>>> -John >>>>>>>>>> >>>>>>>>>> On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak < >>>>>>>>>> >>>>>>>>> jan.filip...@trivago.com >>>>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Still havent completly grabbed it. >>>>>>>>>>> sorry will read more >>>>>>>>>>> >>>>>>>>>>> On 17.08.2018 21:23, Jan Filipiak wrote: >>>>>>>>>>>> Cool stuff. >>>>>>>>>>>> >>>>>>>>>>>> I made some random remarks. Did not touch the core of the >>>>>>>>>>>> >>>>>>>>>>> algorithm >>>>>>> >>>>>>>> yet. >>>>>>>>>> >>>>>>>>>>> Will do Monday 100% >>>>>>>>>>>> >>>>>>>>>>>> I don't see Interactive Queries :) like that! >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 17.08.2018 20:28, Adam Bellemare wrote: >>>>>>>>>>>>> I have submitted a PR with my code against trunk: >>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 >>>>>>>>>>>>> >>>>>>>>>>>>> Do I continue on this thread or do we begin a new one for >>>>>>>>>>>>> >>>>>>>>>>>> discussion? >>>>>>>>> >>>>>>>>>> On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak < >>>>>>>>>>>>> >>>>>>>>>>>> jan.filip...@trivago.com >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> even before message headers, the option for me always existed >>>>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>>> >>>>>>>> just wrap >>>>>>>>>>>>>> the messages into my own custom envelop. >>>>>>>>>>>>>> So I of course thought this through. One sentence in your last >>>>>>>>>>>>>> >>>>>>>>>>>>> email >>>>>>>>> >>>>>>>>>> triggered all the thought process I put in the back then >>>>>>>>>>>>>> again to design it in the, what i think is the "kafka-way". It >>>>>>>>>>>>>> >>>>>>>>>>>>> ended >>>>>>>>> >>>>>>>>>> up >>>>>>>>>> >>>>>>>>>>> ranting a little about what happened in the past. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I see plenty of colleagues of mine falling into traps in the >>>>>>>>>>>>>> >>>>>>>>>>>>> API, >>>>>>> >>>>>>>> that I >>>>>>>>>>>>>> did warn about in the 1.0 DSL rewrite. I have the same >>>>>>>>>>>>>> feeling again. So I hope it gives you some insights into my >>>>>>>>>>>>>> >>>>>>>>>>>>> though >>>>>>>> >>>>>>>>> process. I am aware that since i never ported 213 to higher >>>>>>>>>>>>>> streams version, I don't really have a steak here and >>>>>>>>>>>>>> >>>>>>>>>>>>> initially I >>>>>>> >>>>>>>> didn't >>>>>>>>>>>>>> feel like actually sending it. But maybe you can pull >>>>>>>>>>>>>> something good from it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 15.08.2018 04:44, Adam Bellemare wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Jan >>>>>>>>>>>>>>> Thanks Jan. I take it you mean "key-widening" somehow >>>>>>>>>>>>>>> includes >>>>>>>>>>>>>>> information >>>>>>>>>>>>>>> about which record is processed first? I understand about a >>>>>>>>>>>>>>> CombinedKey >>>>>>>>>>>>>>> with both the Foreign and Primary key, but I don't see how >>>>>>>>>>>>>>> you >>>>>>>>>>>>>>> >>>>>>>>>>>>>> track >>>>>>>>> >>>>>>>>>> ordering metadata in there unless you actually included a >>>>>>>>>>>>>>> >>>>>>>>>>>>>> metadata >>>>>>>> >>>>>>>>> field >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>> the key type as well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Guozhang >>>>>>>>>>>>>>> As Jan mentioned earlier, is Record Headers mean to strictly >>>>>>>>>>>>>>> >>>>>>>>>>>>>> be >>>>>>> >>>>>>>> used in >>>>>>>>>>>>>>> just the user-space? It seems that it is possible that a >>>>>>>>>>>>>>> >>>>>>>>>>>>>> collision >>>>>>>> >>>>>>>>> on the >>>>>>>>>>>>>>> (key,value) tuple I wish to add to it could occur. For >>>>>>>>>>>>>>> >>>>>>>>>>>>>> instance, >>>>>>> >>>>>>>> if >>>>>>>>> >>>>>>>>>> I >>>>>>>>>> >>>>>>>>>>> wanted to add a ("foreignKeyOffset",10) to the Headers but the >>>>>>>>>>>>>>> >>>>>>>>>>>>>> user >>>>>>>>> >>>>>>>>>> already >>>>>>>>>>>>>>> specified their own header with the same key name, then it >>>>>>>>>>>>>>> >>>>>>>>>>>>>> appears >>>>>>>> >>>>>>>>> there >>>>>>>>>>>>>>> would be a collision. (This is one of the issues I brought up >>>>>>>>>>>>>>> >>>>>>>>>>>>>> in >>>>>>> >>>>>>>> the KIP). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -------------------------------- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I will be posting a prototype PR against trunk within the >>>>>>>>>>>>>>> next >>>>>>>>>>>>>>> >>>>>>>>>>>>>> day >>>>>>>> >>>>>>>>> or two. >>>>>>>>>>>>>>> One thing I need to point out is that my design very strictly >>>>>>>>>>>>>>> >>>>>>>>>>>>>> wraps >>>>>>>>> >>>>>>>>>> the >>>>>>>>>>>>>>> entire foreignKeyJoin process entirely within the DSL >>>>>>>>>>>>>>> >>>>>>>>>>>>>> function. >>>>>>> >>>>>>>> There is >>>>>>>>>>>>>>> no >>>>>>>>>>>>>>> exposure of CombinedKeys or widened keys, nothing to resolve >>>>>>>>>>>>>>> >>>>>>>>>>>>>> with >>>>>>>> >>>>>>>>> regards >>>>>>>>>>>>>>> to out-of-order processing and no need for the DSL user to >>>>>>>>>>>>>>> >>>>>>>>>>>>>> even >>>>>>> >>>>>>>> know >>>>>>>>> >>>>>>>>>> what's >>>>>>>>>>>>>>> going on inside of the function. The code simply returns the >>>>>>>>>>>>>>> results of >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> join, keyed by the original key. Currently my API mirrors >>>>>>>>>>>>>>> identically the >>>>>>>>>>>>>>> format of the data returned by the regular join function, and >>>>>>>>>>>>>>> >>>>>>>>>>>>>> I >>>>>>> >>>>>>>> believe >>>>>>>>>>>>>>> that this is very useful to many users of the DSL. It is my >>>>>>>>>>>>>>> understanding >>>>>>>>>>>>>>> that one of the main design goals of the DSL is to provide >>>>>>>>>>>>>>> >>>>>>>>>>>>>> higher >>>>>>>> >>>>>>>>> level >>>>>>>>>>>>>>> functionality without requiring the users to know exactly >>>>>>>>>>>>>>> >>>>>>>>>>>>>> what's >>>>>>> >>>>>>>> going on >>>>>>>>>>>>>>> under the hood. With this in mind, I thought it best to solve >>>>>>>>>>>>>>> ordering and >>>>>>>>>>>>>>> partitioning problems within the function and eliminate the >>>>>>>>>>>>>>> requirement >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>> users to do additional work after the fact to resolve the >>>>>>>>>>>>>>> >>>>>>>>>>>>>> results >>>>>>>> >>>>>>>>> of their >>>>>>>>>>>>>>> join. Basically, I am assuming that most users of the DSL >>>>>>>>>>>>>>> just >>>>>>>>>>>>>>> "want it to >>>>>>>>>>>>>>> work" and want it to be easy. I did this operating under the >>>>>>>>>>>>>>> assumption >>>>>>>>>>>>>>> that if a user truly wants to optimize their own workflow >>>>>>>>>>>>>>> down >>>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>> >>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> finest details then they will break from strictly using the >>>>>>>>>>>>>>> >>>>>>>>>>>>>> DSL >>>>>>> >>>>>>>> and >>>>>>>>> >>>>>>>>>> move >>>>>>>>>>>>>>> down to the processors API. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think. The abstraction is not powerful enough >>>>>>>>>>>>>> to not have kafka specifics leak up The leak I currently think >>>>>>>>>>>>>> >>>>>>>>>>>>> this >>>>>>>> >>>>>>>>> has is >>>>>>>>>>>>>> that you can not reliable prevent the delete coming out first, >>>>>>>>>>>>>> before you emit the correct new record. As it is an >>>>>>>>>>>>>> abstraction >>>>>>>>>>>>>> entirely >>>>>>>>>>>>>> around kafka. >>>>>>>>>>>>>> I can only recommend to not to. Honesty and simplicity should >>>>>>>>>>>>>> >>>>>>>>>>>>> always >>>>>>>>> >>>>>>>>>> be >>>>>>>>>> >>>>>>>>>>> first prio >>>>>>>>>>>>>> trying to hide this just makes it more complex, less >>>>>>>>>>>>>> >>>>>>>>>>>>> understandable >>>>>>>> >>>>>>>>> and >>>>>>>>>> >>>>>>>>>>> will lead to mistakes >>>>>>>>>>>>>> in usage. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Exactly why I am also in big disfavour of GraphNodes and later >>>>>>>>>>>>>> optimization stages. >>>>>>>>>>>>>> Can someone give me an example of an optimisation that really >>>>>>>>>>>>>> >>>>>>>>>>>>> can't >>>>>>>> >>>>>>>>> be >>>>>>>>>> >>>>>>>>>>> handled by the user >>>>>>>>>>>>>> constructing his topology differently? >>>>>>>>>>>>>> Having reusable Processor API components accessible by the DSL >>>>>>>>>>>>>> >>>>>>>>>>>>> and >>>>>>>> >>>>>>>>> composable as >>>>>>>>>>>>>> one likes is exactly where DSL should max out and KSQL should >>>>>>>>>>>>>> >>>>>>>>>>>>> do >>>>>>> >>>>>>>> the >>>>>>>>> >>>>>>>>>> next >>>>>>>>>>>>>> step. >>>>>>>>>>>>>> I find it very unprofessional from a software engineering >>>>>>>>>>>>>> >>>>>>>>>>>>> approach >>>>>>>> >>>>>>>>> to run >>>>>>>>>>>>>> software where >>>>>>>>>>>>>> you can not at least senseful reason about the inner workings >>>>>>>>>>>>>> >>>>>>>>>>>>> of >>>>>>> >>>>>>>> the >>>>>>>>> >>>>>>>>>> libraries used. >>>>>>>>>>>>>> Gives this people have to read and understand in anyway, why >>>>>>>>>>>>>> >>>>>>>>>>>>> try >>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>>> hide >>>>>>>>>>>>>> it? >>>>>>>>>>>>>> >>>>>>>>>>>>>> It really miss the beauty of 0.10 version DSL. >>>>>>>>>>>>>> Apparently not a thing I can influence but just warn about. >>>>>>>>>>>>>> >>>>>>>>>>>>>> @gouzhang >>>>>>>>>>>>>> you can't imagine how many extra IQ-Statestores I constantly >>>>>>>>>>>>>> >>>>>>>>>>>>> prune >>>>>>>> >>>>>>>>> from >>>>>>>>>> >>>>>>>>>>> stream app's >>>>>>>>>>>>>> because people just keep passing Materialized's into all the >>>>>>>>>>>>>> operations. >>>>>>>>>>>>>> :D :'-( >>>>>>>>>>>>>> I regret that I couldn't convince you guys back then. Plus >>>>>>>>>>>>>> this >>>>>>>>>>>>>> >>>>>>>>>>>>> whole >>>>>>>>> >>>>>>>>>> entire topology as a floating >>>>>>>>>>>>>> interface chain, never seen it anywhere :-/ :'( >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don't know. I guess this is just me regretting to only have >>>>>>>>>>>>>> >>>>>>>>>>>>> 24h/day. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I updated the KIP today with some points worth talking about, >>>>>>>>>>>>>> >>>>>>>>>>>>> should >>>>>>>>> >>>>>>>>>> anyone >>>>>>>>>>>>>> >>>>>>>>>>>>>>> be so inclined to check it out. Currently we are running this >>>>>>>>>>>>>>> >>>>>>>>>>>>>> code >>>>>>>> >>>>>>>>> in >>>>>>>>>> >>>>>>>>>>> production to handle relational joins from our Kafka Connect >>>>>>>>>>>>>>> topics, as >>>>>>>>>>>>>>> per >>>>>>>>>>>>>>> the original motivation of the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I believe the foreignKeyJoin should be responsible for. In my >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang< >>>>>>>>>>>>>>> >>>>>>>>>>>>>> wangg...@gmail.com >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello Adam, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As for your question regarding GraphNodes, it is for >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> extending >>>>>>> >>>>>>>> Streams >>>>>>>>>>>>>>>> optimization framework. You can find more details on >>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6761. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The main idea is that instead of directly building up the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> "physical >>>>>>>>> >>>>>>>>>> topology" (represented as Topology in the public package, and >>>>>>>>>>>>>>>> internally >>>>>>>>>>>>>>>> built as the ProcessorTopology class) while users are >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> specifying >>>>>>>> >>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> transformation operators, we first keep it as a "logical >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> topology" >>>>>>>>> >>>>>>>>>> (represented as GraphNode inside InternalStreamsBuilder). And >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> then >>>>>>>>> >>>>>>>>>> only >>>>>>>>>>>>>>>> execute the optimization and the construction of the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> "physical" >>>>>>> >>>>>>>> Topology >>>>>>>>>>>>>>>> when StreamsBuilder.build() is called. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Back to your question, I think it makes more sense to add a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> new >>>>>>> >>>>>>>> type of >>>>>>>>>>>>>>>> StreamsGraphNode (maybe you can consider inheriting from the >>>>>>>>>>>>>>>> BaseJoinProcessorNode). Note that although in the Topology >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> will >>>>>>>>> >>>>>>>>>> have >>>>>>>>>>>>>>>> multiple connected ProcessorNodes to represent a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (foreign-key) >>>>>>> >>>>>>>> join, we >>>>>>>>>>>>>>>> still want to keep it as a single StreamsGraphNode, or just >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>> couple of >>>>>>>>>>>>>>>> them in the logical representation so that in the future we >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> can >>>>>>> >>>>>>>> construct >>>>>>>>>>>>>>>> the physical topology differently (e.g. having another way >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> than >>>>>>> >>>>>>>> the >>>>>>>>> >>>>>>>>>> current >>>>>>>>>>>>>>>> distributed hash-join). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ------------------------------------------------------- >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Back to your questions to KIP-213, I think Jan has >>>>>>>>>>>>>>>> summarized >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> it >>>>>>>> >>>>>>>>> pretty-well. Note that back then we do not have headers >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> support >>>>>>> >>>>>>>> so >>>>>>>>> >>>>>>>>>> we >>>>>>>>>> >>>>>>>>>>> have >>>>>>>>>>>>>>>> to do such "key-widening" approach to ensure ordering. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 11:39 PM, Jan >>>>>>>>>>>>>>>> Filipiak<jan.filip...@trivago.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Adam, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I love how you are on to this already! I resolve this by >>>>>>>>>>>>>>>>> "key-widening" >>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>> treat the result of FKA,and FKB differently. >>>>>>>>>>>>>>>>> As you can see the output of my join has a Combined Key and >>>>>>>>>>>>>>>>> therefore I >>>>>>>>>>>>>>>>> can resolve the "race condition" in a group by >>>>>>>>>>>>>>>>> if I so desire. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think this reflects more what happens under the hood and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> makes >>>>>>>> >>>>>>>>> it more >>>>>>>>>>>>>>>>> clear to the user what is going on. The Idea >>>>>>>>>>>>>>>>> of hiding this behind metadata and handle it in the DSL is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> from >>>>>>>> >>>>>>>>> my POV >>>>>>>>>>>>>>>>> unideal. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> To write into your example: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> key + A, null) >>>>>>>>>>>>>>>>> (key +B, <joined On FK =B>) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> is what my output would look like. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hope that makes sense :D >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 13.08.2018 18:16, Adam Bellemare wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Jan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If you do not use headers or other metadata, how do you >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ensure >>>>>>>> >>>>>>>>> that >>>>>>>>>> >>>>>>>>>>> changes >>>>>>>>>>>>>>>>>> to the foreign-key value are not resolved out-of-order? >>>>>>>>>>>>>>>>>> ie: If an event has FK = A, but you change it to FK = B, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> you >>>>>>> >>>>>>>> need to >>>>>>>>>>>>>>>>>> propagate both a delete (FK=A -> null) and an addition >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (FK=B). >>>>>>>> >>>>>>>>> In my >>>>>>>>>>>>>>>>>> solution, without maintaining any metadata, it is possible >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for >>>>>>>> >>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> final >>>>>>>>>>>>>>>>>> output to be in either order - the correctly updated >>>>>>>>>>>>>>>>>> joined >>>>>>>>>>>>>>>>>> value, or >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> null for the delete. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (key, null) >>>>>>>>>>>>>>>>>> (key, <joined On FK =B>) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (key, <joined On FK =B>) >>>>>>>>>>>>>>>>>> (key, null) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I looked back through your code and through the discussion >>>>>>>>>>>>>>>>>> threads, and >>>>>>>>>>>>>>>>>> didn't see any information on how you resolved this. I >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> have a >>>>>>> >>>>>>>> version >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>> code working for 2.0, I am just adding more integration >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> tests >>>>>>> >>>>>>>> and will >>>>>>>>>>>>>>>>>> update the KIP accordingly. Any insight you could provide >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> on >>>>>>> >>>>>>>> resolving >>>>>>>>>>>>>>>>>> out-of-order semantics without metadata would be helpful. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> Adam >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak < >>>>>>>>>>>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Happy to see that you want to make an effort here. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding the ProcessSuppliers I couldn't find a way to >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> not >>>>>>> >>>>>>>> rewrite >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> joiners + the merger. >>>>>>>>>>>>>>>>>>> The re-partitioners can be reused in theory. I don't know >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> if >>>>>>> >>>>>>>> repartition >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> is optimized in 2.0 now. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I made this >>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 241+ >>>>>>> >>>>>>>> KTable+repartition+with+compacted+Topics >>>>>>>>>>>>>>>>>>> back then and we are running KIP-213 with KIP-241 in >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> combination. >>>>>>>>>> >>>>>>>>>>> For us it is vital as it minimized the size we had in our >>>>>>>>>>>>>>>>>>> repartition >>>>>>>>>>>>>>>>>>> topics plus it removed the factor of 2 in events on every >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> message. >>>>>>>>>> >>>>>>>>>>> I know about this new "delete once consumer has read it". >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I >>>>>>>> >>>>>>>>> don't >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 241 is vital for all usecases, for ours it is. I wanted >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> to use 213 to sneak in the foundations for 241 aswell. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I don't quite understand what a PropagationWrapper is, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> but I >>>>>>> >>>>>>>> am >>>>>>>>> >>>>>>>>>> certain >>>>>>>>>>>>>>>>>>> that you do not need RecordHeaders >>>>>>>>>>>>>>>>>>> for 213 and I would try to leave them out. They either >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> belong >>>>>>>> >>>>>>>>> to the >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> DSL >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> or to the user, having a mixed use is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> to be avoided. We run the join with 0.8 logformat and I >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> don't >>>>>>>> >>>>>>>>> think >>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>> needs more. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This KIP will be very valuable for the streams project! I >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> couldn't >>>>>>>>>> >>>>>>>>>>> never >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> convince myself to invest into the 1.0+ DSL >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> as I used almost all my energy to fight against it. Maybe >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> this >>>>>>>> >>>>>>>>> can >>>>>>>>>> >>>>>>>>>>> also >>>>>>>>>>>>>>>>>>> help me see the good sides a little bit more. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If there is anything unclear with all the text that has >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> been >>>>>>> >>>>>>>> written, >>>>>>>>>>>>>>>>>>> feel >>>>>>>>>>>>>>>>>>> free to just directly cc me so I don't miss it on >>>>>>>>>>>>>>>>>>> the mailing list. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 08.08.2018 15:26, Adam Bellemare wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> More followup, and +dev as Guozhang replied to me >>>>>>>>>>>>>>>>>>> directly >>>>>>>>>>>>>>>>>>> previously. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I am currently porting the code over to trunk. One of the >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> major >>>>>>>>> >>>>>>>>>> changes >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> since 1.0 is the usage of GraphNodes. I have a question >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> about >>>>>>> >>>>>>>> this: >>>>>>>>>> >>>>>>>>>>> For a foreignKey joiner, should it have its own dedicated >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> node >>>>>>>>> >>>>>>>>>> type? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Or >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> would it be advisable to construct it from existing >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> GraphNode >>>>>>> >>>>>>>> components? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >> >