I guess Adam suggests, to use compaction for the repartition topic and don't purge data. Doing this, would allow us to avoid a store changelog topic for the "subscription store" on the RHS. This would be a nice optimization.
But the concern about breaking compaction is correct. However, I see it as an optimization only and thus, if we keep the topic as plain repartition topic and use a separate store changelog topic the issue resolves itself. Maybe we could use headers thought to get this optimization. Do you think it's worth to do this optimization or just stick with the simple design and two topics (repartition plus changelog)? @Adam: thanks for updating the Wiki page. LGTM. -Matthias On 3/11/19 9:24 AM, John Roesler wrote: > Hey Adam, > > That's a good observation, but it wouldn't be a problem for repartition > topics because Streams aggressively deletes messages from the reparation > topics once it knows they are handled. Thus, we don't need to try and cater > to the log compactor. > > Thanks, > -John > > On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> For the sake of expediency, I updated the KIP with what I believe we have >> discussed. >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges >> >> >> >> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <adam.bellem...@gmail.com> >> wrote: >> >>> My only concern was around compaction of records in the repartition >> topic. >>> This would simply mean that these records would stick around as their >> value >>> isn't truly null. Though I know about the usage of compaction on >> changelog >>> topics, I am a bit fuzzy on where we use compaction in other internal >>> topics. So long as this doesn't cause concern I can certainly finish off >>> the KIP today. >>> >>> Thanks >>> >>> Adam >>> >>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> I agree that the LHS side must encode this information and tell the RHS >>>> if a tombstone requires a reply or not. >>>> >>>>>> Would this pose some sort of verbosity problem in the internal >> topics, >>>>>> especially if we have to rebuild state off of them? >>>> >>>> I don't see an issue atm. Can you elaborate how this relates to rebuild >>>> state? >>>> >>>> >>>> -Matthias >>>> >>>> On 3/10/19 12:25 PM, Adam Bellemare wrote: >>>>> Hi Matthias >>>>> >>>>> I have been mulling over the unsubscribe / delete optimization, and I >>>> have >>>>> one main concern. I believe that the RHS can only determine whether to >>>>> propagate the tombstone or not based on the value passed over from the >>>> LHS. >>>>> This value would need to be non-null, and so wouldn't the internal >>>>> repartition topics end up containing many non-null "tombstone" values? >>>>> >>>>> ie: >>>>> Normal tombstone (propagate): (key=123, value=null) >>>>> Don't-propagate-tombstone: (key=123, value=("don't propagate >>>> me, >>>>> but please delete state")) >>>>> >>>>> Would this pose some sort of verbosity problem in the internal topics, >>>>> especially if we have to rebuild state off of them? >>>>> >>>>> Thanks >>>>> >>>>> Adam >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> SGTM. >>>>>> >>>>>> I also had the impression that those duplicates are rather an error >>>> than >>>>>> an case of eventual consistency. Using hashing to avoid sending the >>>>>> payload is a good idea IMHO. >>>>>> >>>>>> @Adam: can you update the KIP accordingly? >>>>>> >>>>>> - add the optimization to not send a reply from RHS to LHS on >>>>>> unsubscribe (if not a tombstone) >>>>>> - explain why using offsets to avoid duplicates does not work >>>>>> - add hashing to avoid duplicates >>>>>> >>>>>> Beside this, I don't have any further comments. Excited to finally >> get >>>>>> this in! >>>>>> >>>>>> Let us know when you have updated the KIP so we can move forward with >>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong >> shot! >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 3/8/19 8:47 AM, John Roesler wrote: >>>>>>> Hi all, >>>>>>> >>>>>>> This proposal sounds good to me, especially since we observe that >>>> people >>>>>>> are already confused when the see duplicate results coming out of >> 1:1 >>>>>> joins >>>>>>> (which is a bug). I take this as "evidence" that we're better off >>>>>>> eliminating those duplicates from the start. Guozhang's proposal >> seems >>>>>> like >>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor. >>>>>>> >>>>>>> Thanks, >>>>>>> -John >>>>>>> >>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare < >>>> adam.bellem...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Guozhang >>>>>>>> >>>>>>>> That would certainly work for eliminating those duplicate values. >> As >>>> it >>>>>>>> stands right now, this would be consistent with swallowing changes >>>> due >>>>>> to >>>>>>>> out-of-order processing with multiple threads, and seems like a >> very >>>>>>>> reasonable way forward. Thank you for the suggestion! >>>>>>>> >>>>>>>> I have been trying to think if there are any other scenarios where >> we >>>>>> can >>>>>>>> end up with duplicates, though I have not been able to identify any >>>>>> others >>>>>>>> at the moment. I will think on it a bit more, but if anyone else >> has >>>> any >>>>>>>> ideas, please chime in. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Adam >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>>>> >>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to >>>>>> separate >>>>>>>>> this issue with the more general issue that today (not only >>>>>> foreign-key, >>>>>>>>> but also co-partition primary-key) table-table joins is still not >>>>>>>> strictly >>>>>>>>> respecting the timestamp ordering since the two changelog streams >>>> may >>>>>> be >>>>>>>>> fetched and hence processed out-of-order and we do not allow a >>>> record >>>>>> to >>>>>>>> be >>>>>>>>> joined with the other table at any given time snapshot yet. So >>>> ideally >>>>>>>> when >>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) >>>>>> coming >>>>>>>>> at the left hand table and one record (f-k1, v3) at the right hand >>>>>> table, >>>>>>>>> depending on the processing ordering we may get: >>>>>>>>> >>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>> >>>>>>>>> or >>>>>>>>> >>>>>>>>> (k1, (f-k1, v1-v3)) >>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>> >>>>>>>>> And this is not to be addressed by this KIP. >>>>>>>>> >>>>>>>>> What I would advocate is to fix the issue that is introduced in >> this >>>>>> KIP >>>>>>>>> alone, that is we may have >>>>>>>>> >>>>>>>>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 >>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>> >>>>>>>>> I admit that it does not have correctness issue from the semantics >>>>>> along, >>>>>>>>> comparing it with "discarding the first result", but it may be >>>>>> confusing >>>>>>>>> from user's observation who do not expect to see the seemingly >>>>>>>> duplicates. >>>>>>>>> On the other hand, I think there's a light solution to avoid it, >>>> which >>>>>> is >>>>>>>>> that we can still optimize away to not send the full payload of >> "v1" >>>>>> from >>>>>>>>> left hand side to right hand side, but instead of just trimming >> off >>>> the >>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm >> using >>>> MD5 >>>>>>>>> here just as an example, we can definitely replace it with other >>>>>>>>> functions), by doing which we can discard the join operation if >> the >>>>>> hash >>>>>>>>> value sent back from the right hand side does not match with the >>>> left >>>>>>>> hand >>>>>>>>> side any more, i.e. we will only send: >>>>>>>>> >>>>>>>>> (k1, (f-k1, v2-v3)) >>>>>>>>> >>>>>>>>> to down streams once. >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare < >>>>>> adam.bellem...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ah yes, I recall it all now. That answers that question as to >> why I >>>>>> had >>>>>>>>>> caching disabled. I can certainly re-enable it since I believe >> the >>>>>> main >>>>>>>>>> concern was simply about reconciling those two iterators. A lack >> of >>>>>>>>>> knowledge there on my part. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you John for weighing in - we certainly both do appreciate >>>> it. I >>>>>>>>>> think that John hits it on the head though with his comment of >> "If >>>> it >>>>>>>>> turns >>>>>>>>>> out we're wrong about this, then it should be possible to fix the >>>>>>>>> semantics >>>>>>>>>> in place, without messing with the API." >>>>>>>>>> >>>>>>>>>> If anyone else would like to weigh in, your thoughts would be >>>> greatly >>>>>>>>>> appreciated. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax < >>>> matth...@confluent.io >>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>>>> I dont know how to range scan over a caching store, probably >> one >>>>>>>> had >>>>>>>>>>>>> to open 2 iterators and merge them. >>>>>>>>>>> >>>>>>>>>>> That happens automatically. If you query a cached KTable, it >>>> ranges >>>>>>>>> over >>>>>>>>>>> the cache and the underlying RocksDB and performs the merging >>>> under >>>>>>>> the >>>>>>>>>>> hood. >>>>>>>>>>> >>>>>>>>>>>>> Other than that, I still think even the regualr join is broken >>>>>>>> with >>>>>>>>>>>>> caching enabled right? >>>>>>>>>>> >>>>>>>>>>> Why? To me, if you use the word "broker", it implies >> conceptually >>>>>>>>>>> incorrect; I don't see this. >>>>>>>>>>> >>>>>>>>>>>> I once files a ticket, because with caching >>>>>>>>>>>>>> enabled it would return values that havent been published >>>>>>>>> downstream >>>>>>>>>>> yet. >>>>>>>>>>> >>>>>>>>>>> For the bug report, if found >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need >>>> to >>>>>>>> fix >>>>>>>>>>> this, but it is a regular bug as any other, and we should not >>>> change >>>>>>>> a >>>>>>>>>>> design because of a bug. >>>>>>>>>>> >>>>>>>>>>> That range() returns values that have not been published >>>> downstream >>>>>>>> if >>>>>>>>>>> caching is enabled is how caching works and is intended >> behavior. >>>> Not >>>>>>>>>>> sure why say it's incorrect? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: >>>>>>>>>>>>> Thanks Adam, >>>>>>>>>>>>> >>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there >> is >>>>>>>> no >>>>>>>>>>>>> functional/correctness requirement to disable caching. I >> cannot >>>>>>>>>>>>> remember why Jan's proposal added this? It might be an >>>>>>>>>>>>> implementation detail though (maybe just remove it from the >> KIP? >>>>>>>>>>>>> -- might be miss leading). >>>>>>>>>>>> >>>>>>>>>>>> I dont know how to range scan over a caching store, probably >> one >>>>>>>> had >>>>>>>>>>>> to open 2 iterators and merge them. >>>>>>>>>>>> >>>>>>>>>>>> Other than that, I still think even the regualr join is broken >>>> with >>>>>>>>>>>> caching enabled right? I once files a ticket, because with >>>> caching >>>>>>>>>>>> enabled it would return values that havent been published >>>>>>>> downstream >>>>>>>>>> yet. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >> >
signature.asc
Description: OpenPGP digital signature