I think Headers are not meant for user-space only: in fact, in KIP-258 there is also a proposal for compatibility of changelog topics relying on Headers. But you have a good point how to avoid conflicting with user header key space.
I think there is no absolute-safe ways to avoid conflicts, but we can still consider using some name patterns to reduce the likelihood as much as possible.. e.g. consider sth. like the internal topics naming: e.g. "__internal_[name]"? I'm cc'ing contributor of KIP-258 to share thoughts as well. Guozhang On Tue, Aug 14, 2018 at 7:44 PM, Adam Bellemare <adam.bellem...@gmail.com> wrote: > @Jan > Thanks Jan. I take it you mean "key-widening" somehow includes information > about which record is processed first? I understand about a CombinedKey > with both the Foreign and Primary key, but I don't see how you track > ordering metadata in there unless you actually included a metadata field in > the key type as well. > > @Guozhang > As Jan mentioned earlier, is Record Headers mean to strictly be used in > just the user-space? It seems that it is possible that a collision on the > (key,value) tuple I wish to add to it could occur. For instance, if I > wanted to add a ("foreignKeyOffset",10) to the Headers but the user already > specified their own header with the same key name, then it appears there > would be a collision. (This is one of the issues I brought up in the KIP). > > -------------------------------- > > I will be posting a prototype PR against trunk within the next day or two. > One thing I need to point out is that my design very strictly wraps the > entire foreignKeyJoin process entirely within the DSL function. There is no > exposure of CombinedKeys or widened keys, nothing to resolve with regards > to out-of-order processing and no need for the DSL user to even know what's > going on inside of the function. The code simply returns the results of the > join, keyed by the original key. Currently my API mirrors identically the > format of the data returned by the regular join function, and I believe > that this is very useful to many users of the DSL. It is my understanding > that one of the main design goals of the DSL is to provide higher level > functionality without requiring the users to know exactly what's going on > under the hood. With this in mind, I thought it best to solve ordering and > partitioning problems within the function and eliminate the requirement for > users to do additional work after the fact to resolve the results of their > join. Basically, I am assuming that most users of the DSL just "want it to > work" and want it to be easy. I did this operating under the assumption > that if a user truly wants to optimize their own workflow down to the > finest details then they will break from strictly using the DSL and move > down to the processors API. > > I updated the KIP today with some points worth talking about, should anyone > be so inclined to check it out. Currently we are running this code in > production to handle relational joins from our Kafka Connect topics, as per > the original motivation of the KIP. > > > > > > > > > > > I believe the foreignKeyJoin should be responsible for. In my > > > > On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Adam, > > > > As for your question regarding GraphNodes, it is for extending Streams > > optimization framework. You can find more details on > > https://issues.apache.org/jira/browse/KAFKA-6761. > > > > The main idea is that instead of directly building up the "physical > > topology" (represented as Topology in the public package, and internally > > built as the ProcessorTopology class) while users are specifying the > > transformation operators, we first keep it as a "logical topology" > > (represented as GraphNode inside InternalStreamsBuilder). And then only > > execute the optimization and the construction of the "physical" Topology > > when StreamsBuilder.build() is called. > > > > Back to your question, I think it makes more sense to add a new type of > > StreamsGraphNode (maybe you can consider inheriting from the > > BaseJoinProcessorNode). Note that although in the Topology we will have > > multiple connected ProcessorNodes to represent a (foreign-key) join, we > > still want to keep it as a single StreamsGraphNode, or just a couple of > > them in the logical representation so that in the future we can construct > > the physical topology differently (e.g. having another way than the > current > > distributed hash-join). > > > > ------------------------------------------------------- > > > > Back to your questions to KIP-213, I think Jan has summarized it > > pretty-well. Note that back then we do not have headers support so we > have > > to do such "key-widening" approach to ensure ordering. > > > > > > Guozhang > > > > > > > > On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak <jan.filip...@trivago.com > > > > wrote: > > > > > Hi Adam, > > > > > > I love how you are on to this already! I resolve this by > "key-widening" I > > > treat the result of FKA,and FKB differently. > > > As you can see the output of my join has a Combined Key and therefore I > > > can resolve the "race condition" in a group by > > > if I so desire. > > > > > > I think this reflects more what happens under the hood and makes it > more > > > clear to the user what is going on. The Idea > > > of hiding this behind metadata and handle it in the DSL is from my POV > > > unideal. > > > > > > To write into your example: > > > > > > key + A, null) > > > (key +B, <joined On FK =B>) > > > > > > is what my output would look like. > > > > > > > > > Hope that makes sense :D > > > > > > Best Jan > > > > > > > > > > > > > > > > > > On 13.08.2018 18:16, Adam Bellemare wrote: > > > > > >> Hi Jan > > >> > > >> If you do not use headers or other metadata, how do you ensure that > > >> changes > > >> to the foreign-key value are not resolved out-of-order? > > >> ie: If an event has FK = A, but you change it to FK = B, you need to > > >> propagate both a delete (FK=A -> null) and an addition (FK=B). In my > > >> solution, without maintaining any metadata, it is possible for the > final > > >> output to be in either order - the correctly updated joined value, or > > the > > >> null for the delete. > > >> > > >> (key, null) > > >> (key, <joined On FK =B>) > > >> > > >> or > > >> > > >> (key, <joined On FK =B>) > > >> (key, null) > > >> > > >> I looked back through your code and through the discussion threads, > and > > >> didn't see any information on how you resolved this. I have a version > of > > >> my > > >> code working for 2.0, I am just adding more integration tests and will > > >> update the KIP accordingly. Any insight you could provide on resolving > > >> out-of-order semantics without metadata would be helpful. > > >> > > >> Thanks > > >> Adam > > >> > > >> > > >> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak < > jan.filip...@trivago.com > > > > > >> wrote: > > >> > > >> Hi, > > >>> > > >>> Happy to see that you want to make an effort here. > > >>> > > >>> Regarding the ProcessSuppliers I couldn't find a way to not rewrite > the > > >>> joiners + the merger. > > >>> The re-partitioners can be reused in theory. I don't know if > > repartition > > >>> is optimized in 2.0 now. > > >>> > > >>> I made this > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+ > > >>> KTable+repartition+with+compacted+Topics > > >>> back then and we are running KIP-213 with KIP-241 in combination. > > >>> > > >>> For us it is vital as it minimized the size we had in our repartition > > >>> topics plus it removed the factor of 2 in events on every message. > > >>> I know about this new "delete once consumer has read it". I don't > > think > > >>> 241 is vital for all usecases, for ours it is. I wanted > > >>> to use 213 to sneak in the foundations for 241 aswell. > > >>> > > >>> I don't quite understand what a PropagationWrapper is, but I am > certain > > >>> that you do not need RecordHeaders > > >>> for 213 and I would try to leave them out. They either belong to the > > DSL > > >>> or to the user, having a mixed use is > > >>> to be avoided. We run the join with 0.8 logformat and I don't think > one > > >>> needs more. > > >>> > > >>> This KIP will be very valuable for the streams project! I couldn't > > never > > >>> convince myself to invest into the 1.0+ DSL > > >>> as I used almost all my energy to fight against it. Maybe this can > also > > >>> help me see the good sides a little bit more. > > >>> > > >>> If there is anything unclear with all the text that has been written, > > >>> feel > > >>> free to just directly cc me so I don't miss it on > > >>> the mailing list. > > >>> > > >>> Best Jan > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> On 08.08.2018 15:26, Adam Bellemare wrote: > > >>> > > >>> More followup, and +dev as Guozhang replied to me directly > previously. > > >>>> > > >>>> I am currently porting the code over to trunk. One of the major > > changes > > >>>> since 1.0 is the usage of GraphNodes. I have a question about this: > > >>>> > > >>>> For a foreignKey joiner, should it have its own dedicated node type? > > Or > > >>>> would it be advisable to construct it from existing GraphNode > > >>>> components? > > >>>> For instance, I believe I could construct it from several > > >>>> OptimizableRepartitionNode, some SinkNode, some SourceNode, and > > several > > >>>> StatefulProcessorNode. That being said, there is some underlying > > >>>> complexity > > >>>> to each approach. > > >>>> > > >>>> I will be switching the KIP-213 to use the RecordHeaders in Kafka > > >>>> Streams > > >>>> instead of the PropagationWrapper, but conceptually it should be the > > >>>> same. > > >>>> > > >>>> Again, any feedback is welcomed... > > >>>> > > >>>> > > >>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare < > > >>>> adam.bellem...@gmail.com > > >>>> wrote: > > >>>> > > >>>> Hi Guozhang et al > > >>>> > > >>>>> I was just reading the 2.0 release notes and noticed a section on > > >>>>> Record > > >>>>> Headers. > > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API > > >>>>> > > >>>>> I am not yet sure if the contents of a RecordHeader is propagated > all > > >>>>> the > > >>>>> way through the Sinks and Sources, but if it is, and if it remains > > >>>>> attached > > >>>>> to the record (including null records) I may be able to ditch the > > >>>>> propagationWrapper for an implementation using RecordHeader. I am > not > > >>>>> yet > > >>>>> sure if this is doable, so if anyone understands RecordHeader impl > > >>>>> better > > >>>>> than I, I would be happy to hear from you. > > >>>>> > > >>>>> In the meantime, let me know of any questions. I believe this PR > has > > a > > >>>>> lot > > >>>>> of potential to solve problems for other people, as I have > > encountered > > >>>>> a > > >>>>> number of other companies in the wild all home-brewing their own > > >>>>> solutions > > >>>>> to come up with a method of handling relational data in streams. > > >>>>> > > >>>>> Adam > > >>>>> > > >>>>> > > >>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang <wangg...@gmail.com > > > > >>>>> wrote: > > >>>>> > > >>>>> Hello Adam, > > >>>>> > > >>>>>> Thanks for rebooting the discussion of this KIP ! Let me finish my > > >>>>>> pass > > >>>>>> on the wiki and get back to you soon. Sorry for the delays.. > > >>>>>> > > >>>>>> Guozhang > > >>>>>> > > >>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare < > > >>>>>> adam.bellem...@gmail.com > > >>>>>> > > >>>>>> wrote: > > >>>>>>> Let me kick this off with a few starting points that I would like > > to > > >>>>>>> generate some discussion on. > > >>>>>>> > > >>>>>>> 1) It seems to me that I will need to repartition the data twice > - > > >>>>>>> once > > >>>>>>> on > > >>>>>>> the foreign key, and once back to the primary key. Is there > > anything > > >>>>>>> I > > >>>>>>> am > > >>>>>>> missing here? > > >>>>>>> > > >>>>>>> 2) I believe I will also need to materialize 3 state stores: the > > >>>>>>> prefixScan > > >>>>>>> SS, the highwater mark SS (for out-of-order resolution) and the > > final > > >>>>>>> state > > >>>>>>> store, due to the workflow I have laid out. I have not thought > of a > > >>>>>>> better > > >>>>>>> way yet, but would appreciate any input on this matter. I have > gone > > >>>>>>> back > > >>>>>>> through the mailing list for the previous discussions on this > KIP, > > >>>>>>> and > > >>>>>>> I > > >>>>>>> did not see anything relating to resolving out-of-order compute. > I > > >>>>>>> cannot > > >>>>>>> see a way around the current three-SS structure that I have. > > >>>>>>> > > >>>>>>> 3) Caching is disabled on the prefixScan SS, as I do not know how > > to > > >>>>>>> resolve the iterator obtained from rocksDB with that of the > cache. > > In > > >>>>>>> addition, I must ensure everything is flushed before scanning. > > Since > > >>>>>>> the > > >>>>>>> materialized prefixScan SS is under "control" of the function, I > do > > >>>>>>> not > > >>>>>>> anticipate this to be a problem. Performance throughput will need > > to > > >>>>>>> be > > >>>>>>> tested, but as Jan observed in his initial overview of this > issue, > > it > > >>>>>>> is > > >>>>>>> generally a surge of output events which affect performance > moreso > > >>>>>>> than > > >>>>>>> the > > >>>>>>> flush or prefixScan itself. > > >>>>>>> > > >>>>>>> Thoughts on any of these are greatly appreciated, since these > > >>>>>>> elements > > >>>>>>> are > > >>>>>>> really the cornerstone of the whole design. I can put up the > code I > > >>>>>>> have > > >>>>>>> written against 1.0.2 if we so desire, but first I was hoping to > > just > > >>>>>>> tackle some of the fundamental design proposals. > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> Adam > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare < > > >>>>>>> adam.bellem...@gmail.com> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>> Here is the new discussion thread for KIP-213. I picked back up > on > > >>>>>>> the > > >>>>>>> KIP > > >>>>>>> > > >>>>>>> as this is something that we too at Flipp are now running in > > >>>>>>>> > > >>>>>>>> production. > > >>>>>>> > > >>>>>>> Jan started this last year, and I know that Trivago is also using > > >>>>>>>> > > >>>>>>>> something > > >>>>>>> > > >>>>>>> similar in production, at least in terms of APIs and > functionality. > > >>>>>>>> > > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>> 213+Support+non-key+joining+in+KTable > > >>>>>>>> > > >>>>>>>> I do have an implementation of the code for Kafka 1.0.2 (our > local > > >>>>>>>> production version) but I won't post it yet as I would like to > > focus > > >>>>>>>> > > >>>>>>>> on the > > >>>>>>> > > >>>>>>> workflow and design first. That being said, I also need to add > some > > >>>>>>>> > > >>>>>>>> clearer > > >>>>>>> > > >>>>>>> integration tests (I did a lot of testing using a non-Kafka > Streams > > >>>>>>>> framework) and clean up the code a bit more before putting it > in a > > >>>>>>>> PR > > >>>>>>>> against trunk (I can do so later this week likely). > > >>>>>>>> > > >>>>>>>> Please take a look, > > >>>>>>>> > > >>>>>>>> Thanks > > >>>>>>>> > > >>>>>>>> Adam Bellemare > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>> -- Guozhang > > >>>>>> > > >>>>>> > > >>>>>> > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang