> I am more inclined to go with keeping it consistent and >> separated into a normal repartition topic and a normal changelog topic >> otherwise.
SGTM. -Matthias On 3/11/19 11:18 AM, Adam Bellemare wrote: > Hi John > > Thanks for the explanation. I wasn't sure how KTable repartition topics > were handled with regards to cleanup but I just wanted to double check to > see if it could cause an issue. > > @Matthias > My inclination is to keep the DSL topologies consistent with one another. I > am a bit concerned about scope creep into the header domain, and I am not > sure how much performance would be improved vs. additional complexity. I > think if we go down this approach we should consider a new type of internal > topic so that it's not confused with existing repartition and changelog > topic types. I am more inclined to go with keeping it consistent and > separated into a normal repartition topic and a normal changelog topic > otherwise. > > Thanks > Adam > > > > > > > On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> I guess Adam suggests, to use compaction for the repartition topic and >> don't purge data. Doing this, would allow us to avoid a store changelog >> topic for the "subscription store" on the RHS. This would be a nice >> optimization. >> >> But the concern about breaking compaction is correct. However, I see it >> as an optimization only and thus, if we keep the topic as plain >> repartition topic and use a separate store changelog topic the issue >> resolves itself. >> >> Maybe we could use headers thought to get this optimization. Do you >> think it's worth to do this optimization or just stick with the simple >> design and two topics (repartition plus changelog)? >> >> >> >> @Adam: thanks for updating the Wiki page. LGTM. >> >> >> -Matthias >> >> >> On 3/11/19 9:24 AM, John Roesler wrote: >>> Hey Adam, >>> >>> That's a good observation, but it wouldn't be a problem for repartition >>> topics because Streams aggressively deletes messages from the reparation >>> topics once it knows they are handled. Thus, we don't need to try and >> cater >>> to the log compactor. >>> >>> Thanks, >>> -John >>> >>> On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare <adam.bellem...@gmail.com >>> >>> wrote: >>> >>>> For the sake of expediency, I updated the KIP with what I believe we >> have >>>> discussed. >>>> >>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges >>>> >>>> >>>> >>>> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare < >> adam.bellem...@gmail.com> >>>> wrote: >>>> >>>>> My only concern was around compaction of records in the repartition >>>> topic. >>>>> This would simply mean that these records would stick around as their >>>> value >>>>> isn't truly null. Though I know about the usage of compaction on >>>> changelog >>>>> topics, I am a bit fuzzy on where we use compaction in other internal >>>>> topics. So long as this doesn't cause concern I can certainly finish >> off >>>>> the KIP today. >>>>> >>>>> Thanks >>>>> >>>>> Adam >>>>> >>>>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> I agree that the LHS side must encode this information and tell the >> RHS >>>>>> if a tombstone requires a reply or not. >>>>>> >>>>>>>> Would this pose some sort of verbosity problem in the internal >>>> topics, >>>>>>>> especially if we have to rebuild state off of them? >>>>>> >>>>>> I don't see an issue atm. Can you elaborate how this relates to >> rebuild >>>>>> state? >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 3/10/19 12:25 PM, Adam Bellemare wrote: >>>>>>> Hi Matthias >>>>>>> >>>>>>> I have been mulling over the unsubscribe / delete optimization, and I >>>>>> have >>>>>>> one main concern. I believe that the RHS can only determine whether >> to >>>>>>> propagate the tombstone or not based on the value passed over from >> the >>>>>> LHS. >>>>>>> This value would need to be non-null, and so wouldn't the internal >>>>>>> repartition topics end up containing many non-null "tombstone" >> values? >>>>>>> >>>>>>> ie: >>>>>>> Normal tombstone (propagate): (key=123, value=null) >>>>>>> Don't-propagate-tombstone: (key=123, value=("don't propagate >>>>>> me, >>>>>>> but please delete state")) >>>>>>> >>>>>>> Would this pose some sort of verbosity problem in the internal >> topics, >>>>>>> especially if we have to rebuild state off of them? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> Adam >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> SGTM. >>>>>>>> >>>>>>>> I also had the impression that those duplicates are rather an error >>>>>> than >>>>>>>> an case of eventual consistency. Using hashing to avoid sending the >>>>>>>> payload is a good idea IMHO. >>>>>>>> >>>>>>>> @Adam: can you update the KIP accordingly? >>>>>>>> >>>>>>>> - add the optimization to not send a reply from RHS to LHS on >>>>>>>> unsubscribe (if not a tombstone) >>>>>>>> - explain why using offsets to avoid duplicates does not work >>>>>>>> - add hashing to avoid duplicates >>>>>>>> >>>>>>>> Beside this, I don't have any further comments. Excited to finally >>>> get >>>>>>>> this in! >>>>>>>> >>>>>>>> Let us know when you have updated the KIP so we can move forward >> with >>>>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong >>>> shot! >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 3/8/19 8:47 AM, John Roesler wrote: >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> This proposal sounds good to me, especially since we observe that >>>>>> people >>>>>>>>> are already confused when the see duplicate results coming out of >>>> 1:1 >>>>>>>> joins >>>>>>>>> (which is a bug). I take this as "evidence" that we're better off >>>>>>>>> eliminating those duplicates from the start. Guozhang's proposal >>>> seems >>>>>>>> like >>>>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare < >>>>>> adam.bellem...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Guozhang >>>>>>>>>> >>>>>>>>>> That would certainly work for eliminating those duplicate values. >>>> As >>>>>> it >>>>>>>>>> stands right now, this would be consistent with swallowing changes >>>>>> due >>>>>>>> to >>>>>>>>>> out-of-order processing with multiple threads, and seems like a >>>> very >>>>>>>>>> reasonable way forward. Thank you for the suggestion! >>>>>>>>>> >>>>>>>>>> I have been trying to think if there are any other scenarios where >>>> we >>>>>>>> can >>>>>>>>>> end up with duplicates, though I have not been able to identify >> any >>>>>>>> others >>>>>>>>>> at the moment. I will think on it a bit more, but if anyone else >>>> has >>>>>> any >>>>>>>>>> ideas, please chime in. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Adam >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to >>>>>>>> separate >>>>>>>>>>> this issue with the more general issue that today (not only >>>>>>>> foreign-key, >>>>>>>>>>> but also co-partition primary-key) table-table joins is still not >>>>>>>>>> strictly >>>>>>>>>>> respecting the timestamp ordering since the two changelog streams >>>>>> may >>>>>>>> be >>>>>>>>>>> fetched and hence processed out-of-order and we do not allow a >>>>>> record >>>>>>>> to >>>>>>>>>> be >>>>>>>>>>> joined with the other table at any given time snapshot yet. So >>>>>> ideally >>>>>>>>>> when >>>>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, >> v2)) >>>>>>>> coming >>>>>>>>>>> at the left hand table and one record (f-k1, v3) at the right >> hand >>>>>>>> table, >>>>>>>>>>> depending on the processing ordering we may get: >>>>>>>>>>> >>>>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>>>> >>>>>>>>>>> or >>>>>>>>>>> >>>>>>>>>>> (k1, (f-k1, v1-v3)) >>>>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>>>> >>>>>>>>>>> And this is not to be addressed by this KIP. >>>>>>>>>>> >>>>>>>>>>> What I would advocate is to fix the issue that is introduced in >>>> this >>>>>>>> KIP >>>>>>>>>>> alone, that is we may have >>>>>>>>>>> >>>>>>>>>>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 >>>>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>>>> >>>>>>>>>>> I admit that it does not have correctness issue from the >> semantics >>>>>>>> along, >>>>>>>>>>> comparing it with "discarding the first result", but it may be >>>>>>>> confusing >>>>>>>>>>> from user's observation who do not expect to see the seemingly >>>>>>>>>> duplicates. >>>>>>>>>>> On the other hand, I think there's a light solution to avoid it, >>>>>> which >>>>>>>> is >>>>>>>>>>> that we can still optimize away to not send the full payload of >>>> "v1" >>>>>>>> from >>>>>>>>>>> left hand side to right hand side, but instead of just trimming >>>> off >>>>>> the >>>>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm >>>> using >>>>>> MD5 >>>>>>>>>>> here just as an example, we can definitely replace it with other >>>>>>>>>>> functions), by doing which we can discard the join operation if >>>> the >>>>>>>> hash >>>>>>>>>>> value sent back from the right hand side does not match with the >>>>>> left >>>>>>>>>> hand >>>>>>>>>>> side any more, i.e. we will only send: >>>>>>>>>>> >>>>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>>>> >>>>>>>>>>> to down streams once. >>>>>>>>>>> >>>>>>>>>>> WDYT? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare < >>>>>>>> adam.bellem...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ah yes, I recall it all now. That answers that question as to >>>> why I >>>>>>>> had >>>>>>>>>>>> caching disabled. I can certainly re-enable it since I believe >>>> the >>>>>>>> main >>>>>>>>>>>> concern was simply about reconciling those two iterators. A lack >>>> of >>>>>>>>>>>> knowledge there on my part. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thank you John for weighing in - we certainly both do appreciate >>>>>> it. I >>>>>>>>>>>> think that John hits it on the head though with his comment of >>>> "If >>>>>> it >>>>>>>>>>> turns >>>>>>>>>>>> out we're wrong about this, then it should be possible to fix >> the >>>>>>>>>>> semantics >>>>>>>>>>>> in place, without messing with the API." >>>>>>>>>>>> >>>>>>>>>>>> If anyone else would like to weigh in, your thoughts would be >>>>>> greatly >>>>>>>>>>>> appreciated. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>>>> I dont know how to range scan over a caching store, probably >>>> one >>>>>>>>>> had >>>>>>>>>>>>>>> to open 2 iterators and merge them. >>>>>>>>>>>>> >>>>>>>>>>>>> That happens automatically. If you query a cached KTable, it >>>>>> ranges >>>>>>>>>>> over >>>>>>>>>>>>> the cache and the underlying RocksDB and performs the merging >>>>>> under >>>>>>>>>> the >>>>>>>>>>>>> hood. >>>>>>>>>>>>> >>>>>>>>>>>>>>> Other than that, I still think even the regualr join is >> broken >>>>>>>>>> with >>>>>>>>>>>>>>> caching enabled right? >>>>>>>>>>>>> >>>>>>>>>>>>> Why? To me, if you use the word "broker", it implies >>>> conceptually >>>>>>>>>>>>> incorrect; I don't see this. >>>>>>>>>>>>> >>>>>>>>>>>>>> I once files a ticket, because with caching >>>>>>>>>>>>>>>> enabled it would return values that havent been published >>>>>>>>>>> downstream >>>>>>>>>>>>> yet. >>>>>>>>>>>>> >>>>>>>>>>>>> For the bug report, if found >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still >> need >>>>>> to >>>>>>>>>> fix >>>>>>>>>>>>> this, but it is a regular bug as any other, and we should not >>>>>> change >>>>>>>>>> a >>>>>>>>>>>>> design because of a bug. >>>>>>>>>>>>> >>>>>>>>>>>>> That range() returns values that have not been published >>>>>> downstream >>>>>>>>>> if >>>>>>>>>>>>> caching is enabled is how caching works and is intended >>>> behavior. >>>>>> Not >>>>>>>>>>>>> sure why say it's incorrect? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: >>>>>>>>>>>>>>> Thanks Adam, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there >>>> is >>>>>>>>>> no >>>>>>>>>>>>>>> functional/correctness requirement to disable caching. I >>>> cannot >>>>>>>>>>>>>>> remember why Jan's proposal added this? It might be an >>>>>>>>>>>>>>> implementation detail though (maybe just remove it from the >>>> KIP? >>>>>>>>>>>>>>> -- might be miss leading). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I dont know how to range scan over a caching store, probably >>>> one >>>>>>>>>> had >>>>>>>>>>>>>> to open 2 iterators and merge them. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Other than that, I still think even the regualr join is broken >>>>>> with >>>>>>>>>>>>>> caching enabled right? I once files a ticket, because with >>>>>> caching >>>>>>>>>>>>>> enabled it would return values that havent been published >>>>>>>>>> downstream >>>>>>>>>>>> yet. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature