I have submitted a PR with my code against trunk: https://github.com/apache/kafka/pull/5527
Do I continue on this thread or do we begin a new one for discussion? On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > even before message headers, the option for me always existed to just wrap > the messages into my own custom envelop. > So I of course thought this through. One sentence in your last email > triggered all the thought process I put in the back then > again to design it in the, what i think is the "kafka-way". It ended up > ranting a little about what happened in the past. > > I see plenty of colleagues of mine falling into traps in the API, that I > did warn about in the 1.0 DSL rewrite. I have the same > feeling again. So I hope it gives you some insights into my though > process. I am aware that since i never ported 213 to higher > streams version, I don't really have a steak here and initially I didn't > feel like actually sending it. But maybe you can pull > something good from it. > > Best jan > > > > On 15.08.2018 04:44, Adam Bellemare wrote: > >> @Jan >> Thanks Jan. I take it you mean "key-widening" somehow includes information >> about which record is processed first? I understand about a CombinedKey >> with both the Foreign and Primary key, but I don't see how you track >> ordering metadata in there unless you actually included a metadata field >> in >> the key type as well. >> >> @Guozhang >> As Jan mentioned earlier, is Record Headers mean to strictly be used in >> just the user-space? It seems that it is possible that a collision on the >> (key,value) tuple I wish to add to it could occur. For instance, if I >> wanted to add a ("foreignKeyOffset",10) to the Headers but the user >> already >> specified their own header with the same key name, then it appears there >> would be a collision. (This is one of the issues I brought up in the KIP). >> >> -------------------------------- >> >> I will be posting a prototype PR against trunk within the next day or two. >> One thing I need to point out is that my design very strictly wraps the >> entire foreignKeyJoin process entirely within the DSL function. There is >> no >> exposure of CombinedKeys or widened keys, nothing to resolve with regards >> to out-of-order processing and no need for the DSL user to even know >> what's >> going on inside of the function. The code simply returns the results of >> the >> join, keyed by the original key. Currently my API mirrors identically the >> format of the data returned by the regular join function, and I believe >> that this is very useful to many users of the DSL. It is my understanding >> that one of the main design goals of the DSL is to provide higher level >> functionality without requiring the users to know exactly what's going on >> under the hood. With this in mind, I thought it best to solve ordering and >> partitioning problems within the function and eliminate the requirement >> for >> users to do additional work after the fact to resolve the results of their >> join. Basically, I am assuming that most users of the DSL just "want it to >> work" and want it to be easy. I did this operating under the assumption >> that if a user truly wants to optimize their own workflow down to the >> finest details then they will break from strictly using the DSL and move >> down to the processors API. >> > I think. The abstraction is not powerful enough > to not have kafka specifics leak up The leak I currently think this has is > that you can not reliable prevent the delete coming out first, > before you emit the correct new record. As it is an abstraction entirely > around kafka. > I can only recommend to not to. Honesty and simplicity should always be > first prio > trying to hide this just makes it more complex, less understandable and > will lead to mistakes > in usage. > > Exactly why I am also in big disfavour of GraphNodes and later > optimization stages. > Can someone give me an example of an optimisation that really can't be > handled by the user > constructing his topology differently? > Having reusable Processor API components accessible by the DSL and > composable as > one likes is exactly where DSL should max out and KSQL should do the next > step. > I find it very unprofessional from a software engineering approach to run > software where > you can not at least senseful reason about the inner workings of the > libraries used. > Gives this people have to read and understand in anyway, why try to hide > it? > > It really miss the beauty of 0.10 version DSL. > Apparently not a thing I can influence but just warn about. > > @gouzhang > you can't imagine how many extra IQ-Statestores I constantly prune from > stream app's > because people just keep passing Materialized's into all the operations. > :D :'-( > I regret that I couldn't convince you guys back then. Plus this whole > entire topology as a floating > interface chain, never seen it anywhere :-/ :'( > > I don't know. I guess this is just me regretting to only have 24h/day. > > > > I updated the KIP today with some points worth talking about, should anyone >> be so inclined to check it out. Currently we are running this code in >> production to handle relational joins from our Kafka Connect topics, as >> per >> the original motivation of the KIP. >> >> >> >> >> >> >> >> >> >> >> I believe the foreignKeyJoin should be responsible for. In my >> >> >> >> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<wangg...@gmail.com> >> wrote: >> >> Hello Adam, >>> >>> As for your question regarding GraphNodes, it is for extending Streams >>> optimization framework. You can find more details on >>> https://issues.apache.org/jira/browse/KAFKA-6761. >>> >>> The main idea is that instead of directly building up the "physical >>> topology" (represented as Topology in the public package, and internally >>> built as the ProcessorTopology class) while users are specifying the >>> transformation operators, we first keep it as a "logical topology" >>> (represented as GraphNode inside InternalStreamsBuilder). And then only >>> execute the optimization and the construction of the "physical" Topology >>> when StreamsBuilder.build() is called. >>> >>> Back to your question, I think it makes more sense to add a new type of >>> StreamsGraphNode (maybe you can consider inheriting from the >>> BaseJoinProcessorNode). Note that although in the Topology we will have >>> multiple connected ProcessorNodes to represent a (foreign-key) join, we >>> still want to keep it as a single StreamsGraphNode, or just a couple of >>> them in the logical representation so that in the future we can construct >>> the physical topology differently (e.g. having another way than the >>> current >>> distributed hash-join). >>> >>> ------------------------------------------------------- >>> >>> Back to your questions to KIP-213, I think Jan has summarized it >>> pretty-well. Note that back then we do not have headers support so we >>> have >>> to do such "key-widening" approach to ensure ordering. >>> >>> >>> Guozhang >>> >>> >>> >>> On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak<jan.filip...@trivago.com> >>> wrote: >>> >>> Hi Adam, >>>> >>>> I love how you are on to this already! I resolve this by "key-widening" >>>> I >>>> treat the result of FKA,and FKB differently. >>>> As you can see the output of my join has a Combined Key and therefore I >>>> can resolve the "race condition" in a group by >>>> if I so desire. >>>> >>>> I think this reflects more what happens under the hood and makes it more >>>> clear to the user what is going on. The Idea >>>> of hiding this behind metadata and handle it in the DSL is from my POV >>>> unideal. >>>> >>>> To write into your example: >>>> >>>> key + A, null) >>>> (key +B, <joined On FK =B>) >>>> >>>> is what my output would look like. >>>> >>>> >>>> Hope that makes sense :D >>>> >>>> Best Jan >>>> >>>> >>>> >>>> >>>> >>>> On 13.08.2018 18:16, Adam Bellemare wrote: >>>> >>>> Hi Jan >>>>> >>>>> If you do not use headers or other metadata, how do you ensure that >>>>> changes >>>>> to the foreign-key value are not resolved out-of-order? >>>>> ie: If an event has FK = A, but you change it to FK = B, you need to >>>>> propagate both a delete (FK=A -> null) and an addition (FK=B). In my >>>>> solution, without maintaining any metadata, it is possible for the >>>>> final >>>>> output to be in either order - the correctly updated joined value, or >>>>> >>>> the >>> >>>> null for the delete. >>>>> >>>>> (key, null) >>>>> (key, <joined On FK =B>) >>>>> >>>>> or >>>>> >>>>> (key, <joined On FK =B>) >>>>> (key, null) >>>>> >>>>> I looked back through your code and through the discussion threads, and >>>>> didn't see any information on how you resolved this. I have a version >>>>> of >>>>> my >>>>> code working for 2.0, I am just adding more integration tests and will >>>>> update the KIP accordingly. Any insight you could provide on resolving >>>>> out-of-order semantics without metadata would be helpful. >>>>> >>>>> Thanks >>>>> Adam >>>>> >>>>> >>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak < >>>>> jan.filip...@trivago.com >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>>> Happy to see that you want to make an effort here. >>>>>> >>>>>> Regarding the ProcessSuppliers I couldn't find a way to not rewrite >>>>>> the >>>>>> joiners + the merger. >>>>>> The re-partitioners can be reused in theory. I don't know if >>>>>> >>>>> repartition >>> >>>> is optimized in 2.0 now. >>>>>> >>>>>> I made this >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+ >>>>>> KTable+repartition+with+compacted+Topics >>>>>> back then and we are running KIP-213 with KIP-241 in combination. >>>>>> >>>>>> For us it is vital as it minimized the size we had in our repartition >>>>>> topics plus it removed the factor of 2 in events on every message. >>>>>> I know about this new "delete once consumer has read it". I don't >>>>>> >>>>> think >>> >>>> 241 is vital for all usecases, for ours it is. I wanted >>>>>> to use 213 to sneak in the foundations for 241 aswell. >>>>>> >>>>>> I don't quite understand what a PropagationWrapper is, but I am >>>>>> certain >>>>>> that you do not need RecordHeaders >>>>>> for 213 and I would try to leave them out. They either belong to the >>>>>> >>>>> DSL >>> >>>> or to the user, having a mixed use is >>>>>> to be avoided. We run the join with 0.8 logformat and I don't think >>>>>> one >>>>>> needs more. >>>>>> >>>>>> This KIP will be very valuable for the streams project! I couldn't >>>>>> >>>>> never >>> >>>> convince myself to invest into the 1.0+ DSL >>>>>> as I used almost all my energy to fight against it. Maybe this can >>>>>> also >>>>>> help me see the good sides a little bit more. >>>>>> >>>>>> If there is anything unclear with all the text that has been written, >>>>>> feel >>>>>> free to just directly cc me so I don't miss it on >>>>>> the mailing list. >>>>>> >>>>>> Best Jan >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 08.08.2018 15:26, Adam Bellemare wrote: >>>>>> >>>>>> More followup, and +dev as Guozhang replied to me directly previously. >>>>>> >>>>>>> I am currently porting the code over to trunk. One of the major >>>>>>> >>>>>> changes >>> >>>> since 1.0 is the usage of GraphNodes. I have a question about this: >>>>>>> >>>>>>> For a foreignKey joiner, should it have its own dedicated node type? >>>>>>> >>>>>> Or >>> >>>> would it be advisable to construct it from existing GraphNode >>>>>>> components? >>>>>>> For instance, I believe I could construct it from several >>>>>>> OptimizableRepartitionNode, some SinkNode, some SourceNode, and >>>>>>> >>>>>> several >>> >>>> StatefulProcessorNode. That being said, there is some underlying >>>>>>> complexity >>>>>>> to each approach. >>>>>>> >>>>>>> I will be switching the KIP-213 to use the RecordHeaders in Kafka >>>>>>> Streams >>>>>>> instead of the PropagationWrapper, but conceptually it should be the >>>>>>> same. >>>>>>> >>>>>>> Again, any feedback is welcomed... >>>>>>> >>>>>>> >>>>>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare < >>>>>>> adam.bellem...@gmail.com >>>>>>> wrote: >>>>>>> >>>>>>> Hi Guozhang et al >>>>>>> >>>>>>> I was just reading the 2.0 release notes and noticed a section on >>>>>>>> Record >>>>>>>> Headers. >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API >>>>>>>> >>>>>>>> I am not yet sure if the contents of a RecordHeader is propagated >>>>>>>> all >>>>>>>> the >>>>>>>> way through the Sinks and Sources, but if it is, and if it remains >>>>>>>> attached >>>>>>>> to the record (including null records) I may be able to ditch the >>>>>>>> propagationWrapper for an implementation using RecordHeader. I am >>>>>>>> not >>>>>>>> yet >>>>>>>> sure if this is doable, so if anyone understands RecordHeader impl >>>>>>>> better >>>>>>>> than I, I would be happy to hear from you. >>>>>>>> >>>>>>>> In the meantime, let me know of any questions. I believe this PR has >>>>>>>> >>>>>>> a >>> >>>> lot >>>>>>>> of potential to solve problems for other people, as I have >>>>>>>> >>>>>>> encountered >>> >>>> a >>>>>>>> number of other companies in the wild all home-brewing their own >>>>>>>> solutions >>>>>>>> to come up with a method of handling relational data in streams. >>>>>>>> >>>>>>>> Adam >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang<wangg...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hello Adam, >>>>>>>> >>>>>>>> Thanks for rebooting the discussion of this KIP ! Let me finish my >>>>>>>>> pass >>>>>>>>> on the wiki and get back to you soon. Sorry for the delays.. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare < >>>>>>>>> adam.bellem...@gmail.com >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Let me kick this off with a few starting points that I would like >>>>>>>>>> >>>>>>>>> to >>> >>>> generate some discussion on. >>>>>>>>>> >>>>>>>>>> 1) It seems to me that I will need to repartition the data twice - >>>>>>>>>> once >>>>>>>>>> on >>>>>>>>>> the foreign key, and once back to the primary key. Is there >>>>>>>>>> >>>>>>>>> anything >>> >>>> I >>>>>>>>>> am >>>>>>>>>> missing here? >>>>>>>>>> >>>>>>>>>> 2) I believe I will also need to materialize 3 state stores: the >>>>>>>>>> prefixScan >>>>>>>>>> SS, the highwater mark SS (for out-of-order resolution) and the >>>>>>>>>> >>>>>>>>> final >>> >>>> state >>>>>>>>>> store, due to the workflow I have laid out. I have not thought of >>>>>>>>>> a >>>>>>>>>> better >>>>>>>>>> way yet, but would appreciate any input on this matter. I have >>>>>>>>>> gone >>>>>>>>>> back >>>>>>>>>> through the mailing list for the previous discussions on this KIP, >>>>>>>>>> and >>>>>>>>>> I >>>>>>>>>> did not see anything relating to resolving out-of-order compute. I >>>>>>>>>> cannot >>>>>>>>>> see a way around the current three-SS structure that I have. >>>>>>>>>> >>>>>>>>>> 3) Caching is disabled on the prefixScan SS, as I do not know how >>>>>>>>>> >>>>>>>>> to >>> >>>> resolve the iterator obtained from rocksDB with that of the cache. >>>>>>>>>> >>>>>>>>> In >>> >>>> addition, I must ensure everything is flushed before scanning. >>>>>>>>>> >>>>>>>>> Since >>> >>>> the >>>>>>>>>> materialized prefixScan SS is under "control" of the function, I >>>>>>>>>> do >>>>>>>>>> not >>>>>>>>>> anticipate this to be a problem. Performance throughput will need >>>>>>>>>> >>>>>>>>> to >>> >>>> be >>>>>>>>>> tested, but as Jan observed in his initial overview of this issue, >>>>>>>>>> >>>>>>>>> it >>> >>>> is >>>>>>>>>> generally a surge of output events which affect performance moreso >>>>>>>>>> than >>>>>>>>>> the >>>>>>>>>> flush or prefixScan itself. >>>>>>>>>> >>>>>>>>>> Thoughts on any of these are greatly appreciated, since these >>>>>>>>>> elements >>>>>>>>>> are >>>>>>>>>> really the cornerstone of the whole design. I can put up the code >>>>>>>>>> I >>>>>>>>>> have >>>>>>>>>> written against 1.0.2 if we so desire, but first I was hoping to >>>>>>>>>> >>>>>>>>> just >>> >>>> tackle some of the fundamental design proposals. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Adam >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare < >>>>>>>>>> adam.bellem...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Here is the new discussion thread for KIP-213. I picked back up on >>>>>>>>>> the >>>>>>>>>> KIP >>>>>>>>>> >>>>>>>>>> as this is something that we too at Flipp are now running in >>>>>>>>>> >>>>>>>>>>> production. >>>>>>>>>>> >>>>>>>>>> Jan started this last year, and I know that Trivago is also using >>>>>>>>>> >>>>>>>>>>> something >>>>>>>>>>> >>>>>>>>>> similar in production, at least in terms of APIs and >>>>>>>>>> functionality. >>>>>>>>>> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>> 213+Support+non-key+joining+in+KTable >>>>>>>>>>> >>>>>>>>>>> I do have an implementation of the code for Kafka 1.0.2 (our >>>>>>>>>>> local >>>>>>>>>>> production version) but I won't post it yet as I would like to >>>>>>>>>>> >>>>>>>>>> focus >>> >>>> on the >>>>>>>>>>> >>>>>>>>>> workflow and design first. That being said, I also need to add >>>>>>>>>> some >>>>>>>>>> >>>>>>>>>>> clearer >>>>>>>>>>> >>>>>>>>>> integration tests (I did a lot of testing using a non-Kafka >>>>>>>>>> Streams >>>>>>>>>> >>>>>>>>>>> framework) and clean up the code a bit more before putting it in >>>>>>>>>>> a >>>>>>>>>>> PR >>>>>>>>>>> against trunk (I can do so later this week likely). >>>>>>>>>>> >>>>>>>>>>> Please take a look, >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> Adam Bellemare >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>> -- Guozhang >>> >>> >