Hi Jan If you do not use headers or other metadata, how do you ensure that changes to the foreign-key value are not resolved out-of-order? ie: If an event has FK = A, but you change it to FK = B, you need to propagate both a delete (FK=A -> null) and an addition (FK=B). In my solution, without maintaining any metadata, it is possible for the final output to be in either order - the correctly updated joined value, or the null for the delete.
(key, null) (key, <joined On FK =B>) or (key, <joined On FK =B>) (key, null) I looked back through your code and through the discussion threads, and didn't see any information on how you resolved this. I have a version of my code working for 2.0, I am just adding more integration tests and will update the KIP accordingly. Any insight you could provide on resolving out-of-order semantics without metadata would be helpful. Thanks Adam On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi, > > Happy to see that you want to make an effort here. > > Regarding the ProcessSuppliers I couldn't find a way to not rewrite the > joiners + the merger. > The re-partitioners can be reused in theory. I don't know if repartition > is optimized in 2.0 now. > > I made this > https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+ > KTable+repartition+with+compacted+Topics > back then and we are running KIP-213 with KIP-241 in combination. > > For us it is vital as it minimized the size we had in our repartition > topics plus it removed the factor of 2 in events on every message. > I know about this new "delete once consumer has read it". I don't think > 241 is vital for all usecases, for ours it is. I wanted > to use 213 to sneak in the foundations for 241 aswell. > > I don't quite understand what a PropagationWrapper is, but I am certain > that you do not need RecordHeaders > for 213 and I would try to leave them out. They either belong to the DSL > or to the user, having a mixed use is > to be avoided. We run the join with 0.8 logformat and I don't think one > needs more. > > This KIP will be very valuable for the streams project! I couldn't never > convince myself to invest into the 1.0+ DSL > as I used almost all my energy to fight against it. Maybe this can also > help me see the good sides a little bit more. > > If there is anything unclear with all the text that has been written, feel > free to just directly cc me so I don't miss it on > the mailing list. > > Best Jan > > > > > > On 08.08.2018 15:26, Adam Bellemare wrote: > >> More followup, and +dev as Guozhang replied to me directly previously. >> >> I am currently porting the code over to trunk. One of the major changes >> since 1.0 is the usage of GraphNodes. I have a question about this: >> >> For a foreignKey joiner, should it have its own dedicated node type? Or >> would it be advisable to construct it from existing GraphNode components? >> For instance, I believe I could construct it from several >> OptimizableRepartitionNode, some SinkNode, some SourceNode, and several >> StatefulProcessorNode. That being said, there is some underlying >> complexity >> to each approach. >> >> I will be switching the KIP-213 to use the RecordHeaders in Kafka Streams >> instead of the PropagationWrapper, but conceptually it should be the same. >> >> Again, any feedback is welcomed... >> >> >> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <adam.bellem...@gmail.com >> > >> wrote: >> >> Hi Guozhang et al >>> >>> I was just reading the 2.0 release notes and noticed a section on Record >>> Headers. >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API >>> >>> I am not yet sure if the contents of a RecordHeader is propagated all the >>> way through the Sinks and Sources, but if it is, and if it remains >>> attached >>> to the record (including null records) I may be able to ditch the >>> propagationWrapper for an implementation using RecordHeader. I am not yet >>> sure if this is doable, so if anyone understands RecordHeader impl better >>> than I, I would be happy to hear from you. >>> >>> In the meantime, let me know of any questions. I believe this PR has a >>> lot >>> of potential to solve problems for other people, as I have encountered a >>> number of other companies in the wild all home-brewing their own >>> solutions >>> to come up with a method of handling relational data in streams. >>> >>> Adam >>> >>> >>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>> Hello Adam, >>>> >>>> Thanks for rebooting the discussion of this KIP ! Let me finish my pass >>>> on the wiki and get back to you soon. Sorry for the delays.. >>>> >>>> Guozhang >>>> >>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare < >>>> adam.bellem...@gmail.com >>>> >>>>> wrote: >>>>> Let me kick this off with a few starting points that I would like to >>>>> generate some discussion on. >>>>> >>>>> 1) It seems to me that I will need to repartition the data twice - once >>>>> on >>>>> the foreign key, and once back to the primary key. Is there anything I >>>>> am >>>>> missing here? >>>>> >>>>> 2) I believe I will also need to materialize 3 state stores: the >>>>> prefixScan >>>>> SS, the highwater mark SS (for out-of-order resolution) and the final >>>>> state >>>>> store, due to the workflow I have laid out. I have not thought of a >>>>> better >>>>> way yet, but would appreciate any input on this matter. I have gone >>>>> back >>>>> through the mailing list for the previous discussions on this KIP, and >>>>> I >>>>> did not see anything relating to resolving out-of-order compute. I >>>>> cannot >>>>> see a way around the current three-SS structure that I have. >>>>> >>>>> 3) Caching is disabled on the prefixScan SS, as I do not know how to >>>>> resolve the iterator obtained from rocksDB with that of the cache. In >>>>> addition, I must ensure everything is flushed before scanning. Since >>>>> the >>>>> materialized prefixScan SS is under "control" of the function, I do not >>>>> anticipate this to be a problem. Performance throughput will need to be >>>>> tested, but as Jan observed in his initial overview of this issue, it >>>>> is >>>>> generally a surge of output events which affect performance moreso than >>>>> the >>>>> flush or prefixScan itself. >>>>> >>>>> Thoughts on any of these are greatly appreciated, since these elements >>>>> are >>>>> really the cornerstone of the whole design. I can put up the code I >>>>> have >>>>> written against 1.0.2 if we so desire, but first I was hoping to just >>>>> tackle some of the fundamental design proposals. >>>>> >>>>> Thanks, >>>>> Adam >>>>> >>>>> >>>>> >>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare < >>>>> adam.bellem...@gmail.com> >>>>> wrote: >>>>> >>>>> Here is the new discussion thread for KIP-213. I picked back up on the >>>>>> >>>>> KIP >>>>> >>>>>> as this is something that we too at Flipp are now running in >>>>>> >>>>> production. >>>>> >>>>>> Jan started this last year, and I know that Trivago is also using >>>>>> >>>>> something >>>>> >>>>>> similar in production, at least in terms of APIs and functionality. >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 213+Support+non-key+joining+in+KTable >>>>>> >>>>>> I do have an implementation of the code for Kafka 1.0.2 (our local >>>>>> production version) but I won't post it yet as I would like to focus >>>>>> >>>>> on the >>>>> >>>>>> workflow and design first. That being said, I also need to add some >>>>>> >>>>> clearer >>>>> >>>>>> integration tests (I did a lot of testing using a non-Kafka Streams >>>>>> framework) and clean up the code a bit more before putting it in a PR >>>>>> against trunk (I can do so later this week likely). >>>>>> >>>>>> Please take a look, >>>>>> >>>>>> Thanks >>>>>> >>>>>> Adam Bellemare >>>>>> >>>>>> >>>>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>>> >>> >