SGTM. I also had the impression that those duplicates are rather an error than an case of eventual consistency. Using hashing to avoid sending the payload is a good idea IMHO.
@Adam: can you update the KIP accordingly? - add the optimization to not send a reply from RHS to LHS on unsubscribe (if not a tombstone) - explain why using offsets to avoid duplicates does not work - add hashing to avoid duplicates Beside this, I don't have any further comments. Excited to finally get this in! Let us know when you have updated the KIP so we can move forward with the VOTE. Thanks a lot for your patience! This was a very loooong shot! -Matthias On 3/8/19 8:47 AM, John Roesler wrote: > Hi all, > > This proposal sounds good to me, especially since we observe that people > are already confused when the see duplicate results coming out of 1:1 joins > (which is a bug). I take this as "evidence" that we're better off > eliminating those duplicates from the start. Guozhang's proposal seems like > a lightweight solution to the problem, so FWIW, I'm in favor. > > Thanks, > -John > > On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Hi Guozhang >> >> That would certainly work for eliminating those duplicate values. As it >> stands right now, this would be consistent with swallowing changes due to >> out-of-order processing with multiple threads, and seems like a very >> reasonable way forward. Thank you for the suggestion! >> >> I have been trying to think if there are any other scenarios where we can >> end up with duplicates, though I have not been able to identify any others >> at the moment. I will think on it a bit more, but if anyone else has any >> ideas, please chime in. >> >> Thanks, >> Adam >> >> >> >> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >>> One more thought regarding *c-P2: Duplicates)*: first I want to separate >>> this issue with the more general issue that today (not only foreign-key, >>> but also co-partition primary-key) table-table joins is still not >> strictly >>> respecting the timestamp ordering since the two changelog streams may be >>> fetched and hence processed out-of-order and we do not allow a record to >> be >>> joined with the other table at any given time snapshot yet. So ideally >> when >>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming >>> at the left hand table and one record (f-k1, v3) at the right hand table, >>> depending on the processing ordering we may get: >>> >>> (k1, (f-k1, v2-v3)) >>> >>> or >>> >>> (k1, (f-k1, v1-v3)) >>> (k1, (f-k1, v2-v3)) >>> >>> And this is not to be addressed by this KIP. >>> >>> What I would advocate is to fix the issue that is introduced in this KIP >>> alone, that is we may have >>> >>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 >>> (k1, (f-k1, v2-v3)) >>> >>> I admit that it does not have correctness issue from the semantics along, >>> comparing it with "discarding the first result", but it may be confusing >>> from user's observation who do not expect to see the seemingly >> duplicates. >>> On the other hand, I think there's a light solution to avoid it, which is >>> that we can still optimize away to not send the full payload of "v1" from >>> left hand side to right hand side, but instead of just trimming off the >>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5 >>> here just as an example, we can definitely replace it with other >>> functions), by doing which we can discard the join operation if the hash >>> value sent back from the right hand side does not match with the left >> hand >>> side any more, i.e. we will only send: >>> >>> (k1, (f-k1, v2-v3)) >>> >>> to down streams once. >>> >>> WDYT? >>> >>> >>> Guozhang >>> >>> >>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <adam.bellem...@gmail.com> >>> wrote: >>> >>>> Ah yes, I recall it all now. That answers that question as to why I had >>>> caching disabled. I can certainly re-enable it since I believe the main >>>> concern was simply about reconciling those two iterators. A lack of >>>> knowledge there on my part. >>>> >>>> >>>> Thank you John for weighing in - we certainly both do appreciate it. I >>>> think that John hits it on the head though with his comment of "If it >>> turns >>>> out we're wrong about this, then it should be possible to fix the >>> semantics >>>> in place, without messing with the API." >>>> >>>> If anyone else would like to weigh in, your thoughts would be greatly >>>> appreciated. >>>> >>>> Thanks >>>> >>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>>>> I dont know how to range scan over a caching store, probably one >> had >>>>>>> to open 2 iterators and merge them. >>>>> >>>>> That happens automatically. If you query a cached KTable, it ranges >>> over >>>>> the cache and the underlying RocksDB and performs the merging under >> the >>>>> hood. >>>>> >>>>>>> Other than that, I still think even the regualr join is broken >> with >>>>>>> caching enabled right? >>>>> >>>>> Why? To me, if you use the word "broker", it implies conceptually >>>>> incorrect; I don't see this. >>>>> >>>>>> I once files a ticket, because with caching >>>>>>>> enabled it would return values that havent been published >>> downstream >>>>> yet. >>>>> >>>>> For the bug report, if found >>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need to >> fix >>>>> this, but it is a regular bug as any other, and we should not change >> a >>>>> design because of a bug. >>>>> >>>>> That range() returns values that have not been published downstream >> if >>>>> caching is enabled is how caching works and is intended behavior. Not >>>>> sure why say it's incorrect? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: >>>>>> >>>>>> >>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: >>>>>>> Thanks Adam, >>>>>>> >>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there is >> no >>>>>>> functional/correctness requirement to disable caching. I cannot >>>>>>> remember why Jan's proposal added this? It might be an >>>>>>> implementation detail though (maybe just remove it from the KIP? >>>>>>> -- might be miss leading). >>>>>> >>>>>> I dont know how to range scan over a caching store, probably one >> had >>>>>> to open 2 iterators and merge them. >>>>>> >>>>>> Other than that, I still think even the regualr join is broken with >>>>>> caching enabled right? I once files a ticket, because with caching >>>>>> enabled it would return values that havent been published >> downstream >>>> yet. >>>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >
signature.asc
Description: OpenPGP digital signature