For the sake of expediency, I updated the KIP with what I believe we have discussed.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <adam.bellem...@gmail.com> wrote: > My only concern was around compaction of records in the repartition topic. > This would simply mean that these records would stick around as their value > isn't truly null. Though I know about the usage of compaction on changelog > topics, I am a bit fuzzy on where we use compaction in other internal > topics. So long as this doesn't cause concern I can certainly finish off > the KIP today. > > Thanks > > Adam > > On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> I agree that the LHS side must encode this information and tell the RHS >> if a tombstone requires a reply or not. >> >> >> Would this pose some sort of verbosity problem in the internal topics, >> >> especially if we have to rebuild state off of them? >> >> I don't see an issue atm. Can you elaborate how this relates to rebuild >> state? >> >> >> -Matthias >> >> On 3/10/19 12:25 PM, Adam Bellemare wrote: >> > Hi Matthias >> > >> > I have been mulling over the unsubscribe / delete optimization, and I >> have >> > one main concern. I believe that the RHS can only determine whether to >> > propagate the tombstone or not based on the value passed over from the >> LHS. >> > This value would need to be non-null, and so wouldn't the internal >> > repartition topics end up containing many non-null "tombstone" values? >> > >> > ie: >> > Normal tombstone (propagate): (key=123, value=null) >> > Don't-propagate-tombstone: (key=123, value=("don't propagate >> me, >> > but please delete state")) >> > >> > Would this pose some sort of verbosity problem in the internal topics, >> > especially if we have to rebuild state off of them? >> > >> > Thanks >> > >> > Adam >> > >> > >> > >> > >> > >> > On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> >> SGTM. >> >> >> >> I also had the impression that those duplicates are rather an error >> than >> >> an case of eventual consistency. Using hashing to avoid sending the >> >> payload is a good idea IMHO. >> >> >> >> @Adam: can you update the KIP accordingly? >> >> >> >> - add the optimization to not send a reply from RHS to LHS on >> >> unsubscribe (if not a tombstone) >> >> - explain why using offsets to avoid duplicates does not work >> >> - add hashing to avoid duplicates >> >> >> >> Beside this, I don't have any further comments. Excited to finally get >> >> this in! >> >> >> >> Let us know when you have updated the KIP so we can move forward with >> >> the VOTE. Thanks a lot for your patience! This was a very loooong shot! >> >> >> >> >> >> -Matthias >> >> >> >> On 3/8/19 8:47 AM, John Roesler wrote: >> >>> Hi all, >> >>> >> >>> This proposal sounds good to me, especially since we observe that >> people >> >>> are already confused when the see duplicate results coming out of 1:1 >> >> joins >> >>> (which is a bug). I take this as "evidence" that we're better off >> >>> eliminating those duplicates from the start. Guozhang's proposal seems >> >> like >> >>> a lightweight solution to the problem, so FWIW, I'm in favor. >> >>> >> >>> Thanks, >> >>> -John >> >>> >> >>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare < >> adam.bellem...@gmail.com> >> >>> wrote: >> >>> >> >>>> Hi Guozhang >> >>>> >> >>>> That would certainly work for eliminating those duplicate values. As >> it >> >>>> stands right now, this would be consistent with swallowing changes >> due >> >> to >> >>>> out-of-order processing with multiple threads, and seems like a very >> >>>> reasonable way forward. Thank you for the suggestion! >> >>>> >> >>>> I have been trying to think if there are any other scenarios where we >> >> can >> >>>> end up with duplicates, though I have not been able to identify any >> >> others >> >>>> at the moment. I will think on it a bit more, but if anyone else has >> any >> >>>> ideas, please chime in. >> >>>> >> >>>> Thanks, >> >>>> Adam >> >>>> >> >>>> >> >>>> >> >>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> >> >> wrote: >> >>>> >> >>>>> One more thought regarding *c-P2: Duplicates)*: first I want to >> >> separate >> >>>>> this issue with the more general issue that today (not only >> >> foreign-key, >> >>>>> but also co-partition primary-key) table-table joins is still not >> >>>> strictly >> >>>>> respecting the timestamp ordering since the two changelog streams >> may >> >> be >> >>>>> fetched and hence processed out-of-order and we do not allow a >> record >> >> to >> >>>> be >> >>>>> joined with the other table at any given time snapshot yet. So >> ideally >> >>>> when >> >>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) >> >> coming >> >>>>> at the left hand table and one record (f-k1, v3) at the right hand >> >> table, >> >>>>> depending on the processing ordering we may get: >> >>>>> >> >>>>> (k1, (f-k1, v2-v3)) >> >>>>> >> >>>>> or >> >>>>> >> >>>>> (k1, (f-k1, v1-v3)) >> >>>>> (k1, (f-k1, v2-v3)) >> >>>>> >> >>>>> And this is not to be addressed by this KIP. >> >>>>> >> >>>>> What I would advocate is to fix the issue that is introduced in this >> >> KIP >> >>>>> alone, that is we may have >> >>>>> >> >>>>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 >> >>>>> (k1, (f-k1, v2-v3)) >> >>>>> >> >>>>> I admit that it does not have correctness issue from the semantics >> >> along, >> >>>>> comparing it with "discarding the first result", but it may be >> >> confusing >> >>>>> from user's observation who do not expect to see the seemingly >> >>>> duplicates. >> >>>>> On the other hand, I think there's a light solution to avoid it, >> which >> >> is >> >>>>> that we can still optimize away to not send the full payload of "v1" >> >> from >> >>>>> left hand side to right hand side, but instead of just trimming off >> the >> >>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using >> MD5 >> >>>>> here just as an example, we can definitely replace it with other >> >>>>> functions), by doing which we can discard the join operation if the >> >> hash >> >>>>> value sent back from the right hand side does not match with the >> left >> >>>> hand >> >>>>> side any more, i.e. we will only send: >> >>>>> >> >>>>> (k1, (f-k1, v2-v3)) >> >>>>> >> >>>>> to down streams once. >> >>>>> >> >>>>> WDYT? >> >>>>> >> >>>>> >> >>>>> Guozhang >> >>>>> >> >>>>> >> >>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare < >> >> adam.bellem...@gmail.com> >> >>>>> wrote: >> >>>>> >> >>>>>> Ah yes, I recall it all now. That answers that question as to why I >> >> had >> >>>>>> caching disabled. I can certainly re-enable it since I believe the >> >> main >> >>>>>> concern was simply about reconciling those two iterators. A lack of >> >>>>>> knowledge there on my part. >> >>>>>> >> >>>>>> >> >>>>>> Thank you John for weighing in - we certainly both do appreciate >> it. I >> >>>>>> think that John hits it on the head though with his comment of "If >> it >> >>>>> turns >> >>>>>> out we're wrong about this, then it should be possible to fix the >> >>>>> semantics >> >>>>>> in place, without messing with the API." >> >>>>>> >> >>>>>> If anyone else would like to weigh in, your thoughts would be >> greatly >> >>>>>> appreciated. >> >>>>>> >> >>>>>> Thanks >> >>>>>> >> >>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax < >> matth...@confluent.io >> >>> >> >>>>>> wrote: >> >>>>>> >> >>>>>>>>> I dont know how to range scan over a caching store, probably one >> >>>> had >> >>>>>>>>> to open 2 iterators and merge them. >> >>>>>>> >> >>>>>>> That happens automatically. If you query a cached KTable, it >> ranges >> >>>>> over >> >>>>>>> the cache and the underlying RocksDB and performs the merging >> under >> >>>> the >> >>>>>>> hood. >> >>>>>>> >> >>>>>>>>> Other than that, I still think even the regualr join is broken >> >>>> with >> >>>>>>>>> caching enabled right? >> >>>>>>> >> >>>>>>> Why? To me, if you use the word "broker", it implies conceptually >> >>>>>>> incorrect; I don't see this. >> >>>>>>> >> >>>>>>>> I once files a ticket, because with caching >> >>>>>>>>>> enabled it would return values that havent been published >> >>>>> downstream >> >>>>>>> yet. >> >>>>>>> >> >>>>>>> For the bug report, if found >> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need >> to >> >>>> fix >> >>>>>>> this, but it is a regular bug as any other, and we should not >> change >> >>>> a >> >>>>>>> design because of a bug. >> >>>>>>> >> >>>>>>> That range() returns values that have not been published >> downstream >> >>>> if >> >>>>>>> caching is enabled is how caching works and is intended behavior. >> Not >> >>>>>>> sure why say it's incorrect? >> >>>>>>> >> >>>>>>> >> >>>>>>> -Matthias >> >>>>>>> >> >>>>>>> >> >>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: >> >>>>>>>>> Thanks Adam, >> >>>>>>>>> >> >>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there is >> >>>> no >> >>>>>>>>> functional/correctness requirement to disable caching. I cannot >> >>>>>>>>> remember why Jan's proposal added this? It might be an >> >>>>>>>>> implementation detail though (maybe just remove it from the KIP? >> >>>>>>>>> -- might be miss leading). >> >>>>>>>> >> >>>>>>>> I dont know how to range scan over a caching store, probably one >> >>>> had >> >>>>>>>> to open 2 iterators and merge them. >> >>>>>>>> >> >>>>>>>> Other than that, I still think even the regualr join is broken >> with >> >>>>>>>> caching enabled right? I once files a ticket, because with >> caching >> >>>>>>>> enabled it would return values that havent been published >> >>>> downstream >> >>>>>> yet. >> >>>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>>> >> >>>>> -- >> >>>>> -- Guozhang >> >>>>> >> >>>> >> >>> >> >> >> >> >> > >> >>