Hi Guozhang By workflow I mean just the overall process of how the KIP is implemented. Any ideas on the ways to reduce the topic count, materializations, if there is a better way to resolve out-of-order than a highwater mark table, if the design philosophy of “keep everything encapsulated within the join function” is appropriate, etc. I can implement the changes that John suggested, but if my overall workflow is not acceptable I would rather address that before making minor changes.
If this requires a full candidate PR ready to go to prod then I can make those changes. Hope that clears things up. Thanks Adam > On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Hi Adam, > > What do you mean by "additional comments on the workflow.", do you mean to > let other review your PR https://github.com/apache/kafka/pull/5527 ? Is is > ready for reviews? > > > Guozhang > > On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Okay, I will implement John's suggestion of namespacing the external >> headers prior to processing, and then removing the namespacing prior to >> emitting. A potential future KIP could be to provide this namespacing >> automatically. >> >> I would also appreciate any other additional comments on the workflow. My >> goal is suss out agreement prior to moving to a vote. >> >>> On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangg...@gmail.com> wrote: >>> >>> I like John's idea as well: for this KIP specifically as we do not expect >>> any other consumers to read the repartition topics externally, we can >>> slightly prefix the header to be safe, while keeping the additional cost >>> (note the header field is per-record, so any additional byte is >> per-record >>> as well) low. >>> >>> >>> Guozhang >>> >>> On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare < >> adam.bellem...@gmail.com >>>> >>> wrote: >>> >>>> Hi John >>>> >>>> That is an excellent idea. The header usage I propose would be limited >>>> entirely to internal topics, and this could very well be the solution >> to >>>> potential conflicts. If we do not officially reserve a prefix "__" >> then I >>>> think this would be the safest idea, as it would entirely avoid any >>>> accidents (perhaps if a company is using its own "__" prefix for other >>>> reasons). >>>> >>>> Thanks >>>> >>>> Adam >>>> >>>> >>>> On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <j...@confluent.io> >> wrote: >>>> >>>>> Just a quick thought regarding headers: >>>>>> I think there is no absolute-safe ways to avoid conflicts, but we >> can >>>>> still >>>>>> consider using some name patterns to reduce the likelihood as much >> as >>>>>> possible.. e.g. consider sth. like the internal topics naming: e.g. >>>>>> "__internal_[name]"? >>>>> >>>>> I think there is a safe way to avoid conflicts, since these headers >> are >>>>> only needed in internal topics (I think): >>>>> For internal and changelog topics, we can namespace all headers: >>>>> * user-defined headers are namespaced as "external." + headerKey >>>>> * internal headers are namespaced as "internal." + headerKey >>>>> >>>>> This is a lot of characters, so we could use a sigil instead (e.g., >> "_" >>>> for >>>>> internal, "~" for external) >>>>> >>>>> We simply apply the namespacing when we read user headers from >> external >>>>> topics into the topology and then de-namespace them before we emit >> them >>>> to >>>>> an external topic (via "to" or "through"). >>>>> Now, it is not possible to collide with user-defined headers. >>>>> >>>>> That said, I'd also be fine with just reserving "__" as a header >> prefix >>>> and >>>>> not worrying about collisions. >>>>> >>>>> Thanks for the KIP, >>>>> -John >>>>> >>>>> On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak < >> jan.filip...@trivago.com >>>> >>>>> wrote: >>>>> >>>>>> Still havent completly grabbed it. >>>>>> sorry will read more >>>>>> >>>>>>> On 17.08.2018 21:23, Jan Filipiak wrote: >>>>>>> Cool stuff. >>>>>>> >>>>>>> I made some random remarks. Did not touch the core of the >> algorithm >>>>> yet. >>>>>>> >>>>>>> Will do Monday 100% >>>>>>> >>>>>>> I don't see Interactive Queries :) like that! >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>> On 17.08.2018 20:28, Adam Bellemare wrote: >>>>>>>> I have submitted a PR with my code against trunk: >>>>>>>> https://github.com/apache/kafka/pull/5527 >>>>>>>> >>>>>>>> Do I continue on this thread or do we begin a new one for >>>> discussion? >>>>>>>> >>>>>>>> On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak < >>>>> jan.filip...@trivago.com >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> even before message headers, the option for me always existed >> to >>>>>>>>> just wrap >>>>>>>>> the messages into my own custom envelop. >>>>>>>>> So I of course thought this through. One sentence in your last >>>> email >>>>>>>>> triggered all the thought process I put in the back then >>>>>>>>> again to design it in the, what i think is the "kafka-way". It >>>> ended >>>>> up >>>>>>>>> ranting a little about what happened in the past. >>>>>>>>> >>>>>>>>> I see plenty of colleagues of mine falling into traps in the >> API, >>>>>>>>> that I >>>>>>>>> did warn about in the 1.0 DSL rewrite. I have the same >>>>>>>>> feeling again. So I hope it gives you some insights into my >>> though >>>>>>>>> process. I am aware that since i never ported 213 to higher >>>>>>>>> streams version, I don't really have a steak here and >> initially I >>>>>>>>> didn't >>>>>>>>> feel like actually sending it. But maybe you can pull >>>>>>>>> something good from it. >>>>>>>>> >>>>>>>>> Best jan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 15.08.2018 04:44, Adam Bellemare wrote: >>>>>>>>>> >>>>>>>>>> @Jan >>>>>>>>>> Thanks Jan. I take it you mean "key-widening" somehow includes >>>>>>>>>> information >>>>>>>>>> about which record is processed first? I understand about a >>>>>>>>>> CombinedKey >>>>>>>>>> with both the Foreign and Primary key, but I don't see how you >>>> track >>>>>>>>>> ordering metadata in there unless you actually included a >>> metadata >>>>>>>>>> field >>>>>>>>>> in >>>>>>>>>> the key type as well. >>>>>>>>>> >>>>>>>>>> @Guozhang >>>>>>>>>> As Jan mentioned earlier, is Record Headers mean to strictly >> be >>>>>>>>>> used in >>>>>>>>>> just the user-space? It seems that it is possible that a >>> collision >>>>>>>>>> on the >>>>>>>>>> (key,value) tuple I wish to add to it could occur. For >> instance, >>>> if >>>>> I >>>>>>>>>> wanted to add a ("foreignKeyOffset",10) to the Headers but the >>>> user >>>>>>>>>> already >>>>>>>>>> specified their own header with the same key name, then it >>> appears >>>>>>>>>> there >>>>>>>>>> would be a collision. (This is one of the issues I brought up >> in >>>>>>>>>> the KIP). >>>>>>>>>> >>>>>>>>>> -------------------------------- >>>>>>>>>> >>>>>>>>>> I will be posting a prototype PR against trunk within the next >>> day >>>>>>>>>> or two. >>>>>>>>>> One thing I need to point out is that my design very strictly >>>> wraps >>>>>>>>>> the >>>>>>>>>> entire foreignKeyJoin process entirely within the DSL >> function. >>>>>>>>>> There is >>>>>>>>>> no >>>>>>>>>> exposure of CombinedKeys or widened keys, nothing to resolve >>> with >>>>>>>>>> regards >>>>>>>>>> to out-of-order processing and no need for the DSL user to >> even >>>> know >>>>>>>>>> what's >>>>>>>>>> going on inside of the function. The code simply returns the >>>>>>>>>> results of >>>>>>>>>> the >>>>>>>>>> join, keyed by the original key. Currently my API mirrors >>>>>>>>>> identically the >>>>>>>>>> format of the data returned by the regular join function, and >> I >>>>>>>>>> believe >>>>>>>>>> that this is very useful to many users of the DSL. It is my >>>>>>>>>> understanding >>>>>>>>>> that one of the main design goals of the DSL is to provide >>> higher >>>>>>>>>> level >>>>>>>>>> functionality without requiring the users to know exactly >> what's >>>>>>>>>> going on >>>>>>>>>> under the hood. With this in mind, I thought it best to solve >>>>>>>>>> ordering and >>>>>>>>>> partitioning problems within the function and eliminate the >>>>>>>>>> requirement >>>>>>>>>> for >>>>>>>>>> users to do additional work after the fact to resolve the >>> results >>>>>>>>>> of their >>>>>>>>>> join. Basically, I am assuming that most users of the DSL just >>>>>>>>>> "want it to >>>>>>>>>> work" and want it to be easy. I did this operating under the >>>>>>>>>> assumption >>>>>>>>>> that if a user truly wants to optimize their own workflow down >>> to >>>>> the >>>>>>>>>> finest details then they will break from strictly using the >> DSL >>>> and >>>>>>>>>> move >>>>>>>>>> down to the processors API. >>>>>>>>>> >>>>>>>>> I think. The abstraction is not powerful enough >>>>>>>>> to not have kafka specifics leak up The leak I currently think >>> this >>>>>>>>> has is >>>>>>>>> that you can not reliable prevent the delete coming out first, >>>>>>>>> before you emit the correct new record. As it is an abstraction >>>>>>>>> entirely >>>>>>>>> around kafka. >>>>>>>>> I can only recommend to not to. Honesty and simplicity should >>>> always >>>>> be >>>>>>>>> first prio >>>>>>>>> trying to hide this just makes it more complex, less >>> understandable >>>>> and >>>>>>>>> will lead to mistakes >>>>>>>>> in usage. >>>>>>>>> >>>>>>>>> Exactly why I am also in big disfavour of GraphNodes and later >>>>>>>>> optimization stages. >>>>>>>>> Can someone give me an example of an optimisation that really >>> can't >>>>> be >>>>>>>>> handled by the user >>>>>>>>> constructing his topology differently? >>>>>>>>> Having reusable Processor API components accessible by the DSL >>> and >>>>>>>>> composable as >>>>>>>>> one likes is exactly where DSL should max out and KSQL should >> do >>>> the >>>>>>>>> next >>>>>>>>> step. >>>>>>>>> I find it very unprofessional from a software engineering >>> approach >>>>>>>>> to run >>>>>>>>> software where >>>>>>>>> you can not at least senseful reason about the inner workings >> of >>>> the >>>>>>>>> libraries used. >>>>>>>>> Gives this people have to read and understand in anyway, why >> try >>> to >>>>>>>>> hide >>>>>>>>> it? >>>>>>>>> >>>>>>>>> It really miss the beauty of 0.10 version DSL. >>>>>>>>> Apparently not a thing I can influence but just warn about. >>>>>>>>> >>>>>>>>> @gouzhang >>>>>>>>> you can't imagine how many extra IQ-Statestores I constantly >>> prune >>>>> from >>>>>>>>> stream app's >>>>>>>>> because people just keep passing Materialized's into all the >>>>>>>>> operations. >>>>>>>>> :D :'-( >>>>>>>>> I regret that I couldn't convince you guys back then. Plus this >>>> whole >>>>>>>>> entire topology as a floating >>>>>>>>> interface chain, never seen it anywhere :-/ :'( >>>>>>>>> >>>>>>>>> I don't know. I guess this is just me regretting to only have >>>>> 24h/day. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I updated the KIP today with some points worth talking about, >>>> should >>>>>>>>> anyone >>>>>>>>>> be so inclined to check it out. Currently we are running this >>> code >>>>> in >>>>>>>>>> production to handle relational joins from our Kafka Connect >>>>>>>>>> topics, as >>>>>>>>>> per >>>>>>>>>> the original motivation of the KIP. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I believe the foreignKeyJoin should be responsible for. In my >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang< >>> wangg...@gmail.com >>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hello Adam, >>>>>>>>>>> As for your question regarding GraphNodes, it is for >> extending >>>>>>>>>>> Streams >>>>>>>>>>> optimization framework. You can find more details on >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6761. >>>>>>>>>>> >>>>>>>>>>> The main idea is that instead of directly building up the >>>> "physical >>>>>>>>>>> topology" (represented as Topology in the public package, and >>>>>>>>>>> internally >>>>>>>>>>> built as the ProcessorTopology class) while users are >>> specifying >>>>> the >>>>>>>>>>> transformation operators, we first keep it as a "logical >>>> topology" >>>>>>>>>>> (represented as GraphNode inside InternalStreamsBuilder). And >>>> then >>>>>>>>>>> only >>>>>>>>>>> execute the optimization and the construction of the >> "physical" >>>>>>>>>>> Topology >>>>>>>>>>> when StreamsBuilder.build() is called. >>>>>>>>>>> >>>>>>>>>>> Back to your question, I think it makes more sense to add a >> new >>>>>>>>>>> type of >>>>>>>>>>> StreamsGraphNode (maybe you can consider inheriting from the >>>>>>>>>>> BaseJoinProcessorNode). Note that although in the Topology we >>>> will >>>>>>>>>>> have >>>>>>>>>>> multiple connected ProcessorNodes to represent a >> (foreign-key) >>>>>>>>>>> join, we >>>>>>>>>>> still want to keep it as a single StreamsGraphNode, or just a >>>>>>>>>>> couple of >>>>>>>>>>> them in the logical representation so that in the future we >> can >>>>>>>>>>> construct >>>>>>>>>>> the physical topology differently (e.g. having another way >> than >>>> the >>>>>>>>>>> current >>>>>>>>>>> distributed hash-join). >>>>>>>>>>> >>>>>>>>>>> ------------------------------------------------------- >>>>>>>>>>> >>>>>>>>>>> Back to your questions to KIP-213, I think Jan has summarized >>> it >>>>>>>>>>> pretty-well. Note that back then we do not have headers >> support >>>> so >>>>> we >>>>>>>>>>> have >>>>>>>>>>> to do such "key-widening" approach to ensure ordering. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Aug 13, 2018 at 11:39 PM, Jan >>>>>>>>>>> Filipiak<jan.filip...@trivago.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Adam, >>>>>>>>>>>> I love how you are on to this already! I resolve this by >>>>>>>>>>>> "key-widening" >>>>>>>>>>>> I >>>>>>>>>>>> treat the result of FKA,and FKB differently. >>>>>>>>>>>> As you can see the output of my join has a Combined Key and >>>>>>>>>>>> therefore I >>>>>>>>>>>> can resolve the "race condition" in a group by >>>>>>>>>>>> if I so desire. >>>>>>>>>>>> >>>>>>>>>>>> I think this reflects more what happens under the hood and >>> makes >>>>>>>>>>>> it more >>>>>>>>>>>> clear to the user what is going on. The Idea >>>>>>>>>>>> of hiding this behind metadata and handle it in the DSL is >>> from >>>>>>>>>>>> my POV >>>>>>>>>>>> unideal. >>>>>>>>>>>> >>>>>>>>>>>> To write into your example: >>>>>>>>>>>> >>>>>>>>>>>> key + A, null) >>>>>>>>>>>> (key +B, <joined On FK =B>) >>>>>>>>>>>> >>>>>>>>>>>> is what my output would look like. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Hope that makes sense :D >>>>>>>>>>>> >>>>>>>>>>>> Best Jan >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 13.08.2018 18:16, Adam Bellemare wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Jan >>>>>>>>>>>>> If you do not use headers or other metadata, how do you >>> ensure >>>>> that >>>>>>>>>>>>> changes >>>>>>>>>>>>> to the foreign-key value are not resolved out-of-order? >>>>>>>>>>>>> ie: If an event has FK = A, but you change it to FK = B, >> you >>>>>>>>>>>>> need to >>>>>>>>>>>>> propagate both a delete (FK=A -> null) and an addition >>> (FK=B). >>>>>>>>>>>>> In my >>>>>>>>>>>>> solution, without maintaining any metadata, it is possible >>> for >>>>> the >>>>>>>>>>>>> final >>>>>>>>>>>>> output to be in either order - the correctly updated joined >>>>>>>>>>>>> value, or >>>>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>> null for the delete. >>>>>>>>>>>>> (key, null) >>>>>>>>>>>>> (key, <joined On FK =B>) >>>>>>>>>>>>> >>>>>>>>>>>>> or >>>>>>>>>>>>> >>>>>>>>>>>>> (key, <joined On FK =B>) >>>>>>>>>>>>> (key, null) >>>>>>>>>>>>> >>>>>>>>>>>>> I looked back through your code and through the discussion >>>>>>>>>>>>> threads, and >>>>>>>>>>>>> didn't see any information on how you resolved this. I >> have a >>>>>>>>>>>>> version >>>>>>>>>>>>> of >>>>>>>>>>>>> my >>>>>>>>>>>>> code working for 2.0, I am just adding more integration >> tests >>>>>>>>>>>>> and will >>>>>>>>>>>>> update the KIP accordingly. Any insight you could provide >> on >>>>>>>>>>>>> resolving >>>>>>>>>>>>> out-of-order semantics without metadata would be helpful. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Adam >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak < >>>>>>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>>> Happy to see that you want to make an effort here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding the ProcessSuppliers I couldn't find a way to >> not >>>>>>>>>>>>>> rewrite >>>>>>>>>>>>>> the >>>>>>>>>>>>>> joiners + the merger. >>>>>>>>>>>>>> The re-partitioners can be reused in theory. I don't know >> if >>>>>>>>>>>>>> >>>>>>>>>>>>> repartition >>>>>>>>>>>> is optimized in 2.0 now. >>>>>>>>>>>>>> I made this >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 241+ >>>>>>>>>>>>>> KTable+repartition+with+compacted+Topics >>>>>>>>>>>>>> back then and we are running KIP-213 with KIP-241 in >>>>> combination. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For us it is vital as it minimized the size we had in our >>>>>>>>>>>>>> repartition >>>>>>>>>>>>>> topics plus it removed the factor of 2 in events on every >>>>> message. >>>>>>>>>>>>>> I know about this new "delete once consumer has read it". >>> I >>>>>>>>>>>>>> don't >>>>>>>>>>>>>> >>>>>>>>>>>>> think >>>>>>>>>>>> 241 is vital for all usecases, for ours it is. I wanted >>>>>>>>>>>>>> to use 213 to sneak in the foundations for 241 aswell. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don't quite understand what a PropagationWrapper is, >> but I >>>> am >>>>>>>>>>>>>> certain >>>>>>>>>>>>>> that you do not need RecordHeaders >>>>>>>>>>>>>> for 213 and I would try to leave them out. They either >>> belong >>>>>>>>>>>>>> to the >>>>>>>>>>>>>> >>>>>>>>>>>>> DSL >>>>>>>>>>>> or to the user, having a mixed use is >>>>>>>>>>>>>> to be avoided. We run the join with 0.8 logformat and I >>> don't >>>>>>>>>>>>>> think >>>>>>>>>>>>>> one >>>>>>>>>>>>>> needs more. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This KIP will be very valuable for the streams project! I >>>>> couldn't >>>>>>>>>>>>>> >>>>>>>>>>>>> never >>>>>>>>>>>> convince myself to invest into the 1.0+ DSL >>>>>>>>>>>>>> as I used almost all my energy to fight against it. Maybe >>> this >>>>> can >>>>>>>>>>>>>> also >>>>>>>>>>>>>> help me see the good sides a little bit more. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If there is anything unclear with all the text that has >> been >>>>>>>>>>>>>> written, >>>>>>>>>>>>>> feel >>>>>>>>>>>>>> free to just directly cc me so I don't miss it on >>>>>>>>>>>>>> the mailing list. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 08.08.2018 15:26, Adam Bellemare wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> More followup, and +dev as Guozhang replied to me directly >>>>>>>>>>>>>> previously. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am currently porting the code over to trunk. One of the >>>> major >>>>>>>>>>>>>>> >>>>>>>>>>>>>> changes >>>>>>>>>>>> since 1.0 is the usage of GraphNodes. I have a question >> about >>>>> this: >>>>>>>>>>>>>>> For a foreignKey joiner, should it have its own dedicated >>>> node >>>>>>>>>>>>>>> type? >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Or >>>>>>>>>>>> would it be advisable to construct it from existing >> GraphNode >>>>>>>>>>>>>>> components? >>>>>>>>>>>>>>> For instance, I believe I could construct it from several >>>>>>>>>>>>>>> OptimizableRepartitionNode, some SinkNode, some >> SourceNode, >>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>> several >>>>>>>>>>>> StatefulProcessorNode. That being said, there is some >>> underlying >>>>>>>>>>>>>>> complexity >>>>>>>>>>>>>>> to each approach. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I will be switching the KIP-213 to use the RecordHeaders >> in >>>>> Kafka >>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>> instead of the PropagationWrapper, but conceptually it >>> should >>>>>>>>>>>>>>> be the >>>>>>>>>>>>>>> same. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Again, any feedback is welcomed... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare < >>>>>>>>>>>>>>> adam.bellem...@gmail.com >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Guozhang et al >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I was just reading the 2.0 release notes and noticed a >>>> section >>>>> on >>>>>>>>>>>>>>>> Record >>>>>>>>>>>>>>>> Headers. >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+ >>>>> Processor+API >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am not yet sure if the contents of a RecordHeader is >>>>>>>>>>>>>>>> propagated >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> way through the Sinks and Sources, but if it is, and if >> it >>>>>>>>>>>>>>>> remains >>>>>>>>>>>>>>>> attached >>>>>>>>>>>>>>>> to the record (including null records) I may be able to >>>> ditch >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> propagationWrapper for an implementation using >>> RecordHeader. >>>>>>>>>>>>>>>> I am >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> yet >>>>>>>>>>>>>>>> sure if this is doable, so if anyone understands >>>> RecordHeader >>>>>>>>>>>>>>>> impl >>>>>>>>>>>>>>>> better >>>>>>>>>>>>>>>> than I, I would be happy to hear from you. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In the meantime, let me know of any questions. I believe >>>> this >>>>>>>>>>>>>>>> PR has >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a >>>>>>>>>>>> lot >>>>>>>>>>>>>>>> of potential to solve problems for other people, as I >> have >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> encountered >>>>>>>>>>>> a >>>>>>>>>>>>>>>> number of other companies in the wild all home-brewing >>> their >>>>> own >>>>>>>>>>>>>>>> solutions >>>>>>>>>>>>>>>> to come up with a method of handling relational data in >>>>> streams. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Adam >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang >>>>>>>>>>>>>>>> Wang<wangg...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello Adam, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for rebooting the discussion of this KIP ! Let me >>>>>>>>>>>>>>>> finish my >>>>>>>>>>>>>>>>> pass >>>>>>>>>>>>>>>>> on the wiki and get back to you soon. Sorry for the >>>> delays.. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare < >>>>>>>>>>>>>>>>> adam.bellem...@gmail.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Let me kick this off with a few starting points that I >>>>>>>>>>>>>>>>>> would like >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>> generate some discussion on. >>>>>>>>>>>>>>>>>> 1) It seems to me that I will need to repartition the >>> data >>>>>>>>>>>>>>>>>> twice - >>>>>>>>>>>>>>>>>> once >>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> the foreign key, and once back to the primary key. Is >>>> there >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> anything >>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>> missing here? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2) I believe I will also need to materialize 3 state >>>>>>>>>>>>>>>>>> stores: the >>>>>>>>>>>>>>>>>> prefixScan >>>>>>>>>>>>>>>>>> SS, the highwater mark SS (for out-of-order >> resolution) >>>> and >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> final >>>>>>>>>>>> state >>>>>>>>>>>>>>>>>> store, due to the workflow I have laid out. I have not >>>>>>>>>>>>>>>>>> thought of >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> better >>>>>>>>>>>>>>>>>> way yet, but would appreciate any input on this >> matter. >>> I >>>>> have >>>>>>>>>>>>>>>>>> gone >>>>>>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>>> through the mailing list for the previous discussions >> on >>>>>>>>>>>>>>>>>> this KIP, >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> did not see anything relating to resolving >> out-of-order >>>>>>>>>>>>>>>>>> compute. I >>>>>>>>>>>>>>>>>> cannot >>>>>>>>>>>>>>>>>> see a way around the current three-SS structure that I >>>> have. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 3) Caching is disabled on the prefixScan SS, as I do >> not >>>>>>>>>>>>>>>>>> know how >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>> resolve the iterator obtained from rocksDB with that of the >>>> cache. >>>>>>>>>>>>>>>>> In >>>>>>>>>>>> addition, I must ensure everything is flushed before >> scanning. >>>>>>>>>>>>>>>>> Since >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> materialized prefixScan SS is under "control" of the >>>>>>>>>>>>>>>>>> function, I >>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> anticipate this to be a problem. Performance >> throughput >>>>>>>>>>>>>>>>>> will need >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> tested, but as Jan observed in his initial overview of >>>> this >>>>>>>>>>>>>>>>>> issue, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> it >>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> generally a surge of output events which affect >>>> performance >>>>>>>>>>>>>>>>>> moreso >>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> flush or prefixScan itself. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thoughts on any of these are greatly appreciated, >> since >>>>> these >>>>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> really the cornerstone of the whole design. I can put >> up >>>>>>>>>>>>>>>>>> the code >>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> written against 1.0.2 if we so desire, but first I was >>>>>>>>>>>>>>>>>> hoping to >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> just >>>>>>>>>>>> tackle some of the fundamental design proposals. >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Adam >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare < >>>>>>>>>>>>>>>>>> adam.bellem...@gmail.com> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Here is the new discussion thread for KIP-213. I >> picked >>>>>>>>>>>>>>>>>> back up on >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> as this is something that we too at Flipp are now >>> running >>>> in >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> production. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Jan started this last year, and I know that Trivago is >>>> also >>>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> something >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> similar in production, at least in terms of APIs and >>>>>>>>>>>>>>>>>> functionality. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ >> confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>> 213+Support+non-key+joining+in+KTable >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I do have an implementation of the code for Kafka >> 1.0.2 >>>>> (our >>>>>>>>>>>>>>>>>>> local >>>>>>>>>>>>>>>>>>> production version) but I won't post it yet as I >> would >>>>>>>>>>>>>>>>>>> like to >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> focus >>>>>>>>>>>> on the >>>>>>>>>>>>>>>>>> workflow and design first. That being said, I also >> need >>> to >>>>> add >>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> clearer >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> integration tests (I did a lot of testing using a >>>> non-Kafka >>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> framework) and clean up the code a bit more before >>>> putting >>>>>>>>>>>>>>>>>>> it in >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> PR >>>>>>>>>>>>>>>>>>> against trunk (I can do so later this week likely). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please take a look, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Adam Bellemare >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> > > > > -- > -- Guozhang