My only concern was around compaction of records in the repartition topic. This would simply mean that these records would stick around as their value isn't truly null. Though I know about the usage of compaction on changelog topics, I am a bit fuzzy on where we use compaction in other internal topics. So long as this doesn't cause concern I can certainly finish off the KIP today.
Thanks Adam On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <matth...@confluent.io> wrote: > I agree that the LHS side must encode this information and tell the RHS > if a tombstone requires a reply or not. > > >> Would this pose some sort of verbosity problem in the internal topics, > >> especially if we have to rebuild state off of them? > > I don't see an issue atm. Can you elaborate how this relates to rebuild > state? > > > -Matthias > > On 3/10/19 12:25 PM, Adam Bellemare wrote: > > Hi Matthias > > > > I have been mulling over the unsubscribe / delete optimization, and I > have > > one main concern. I believe that the RHS can only determine whether to > > propagate the tombstone or not based on the value passed over from the > LHS. > > This value would need to be non-null, and so wouldn't the internal > > repartition topics end up containing many non-null "tombstone" values? > > > > ie: > > Normal tombstone (propagate): (key=123, value=null) > > Don't-propagate-tombstone: (key=123, value=("don't propagate me, > > but please delete state")) > > > > Would this pose some sort of verbosity problem in the internal topics, > > especially if we have to rebuild state off of them? > > > > Thanks > > > > Adam > > > > > > > > > > > > On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> SGTM. > >> > >> I also had the impression that those duplicates are rather an error than > >> an case of eventual consistency. Using hashing to avoid sending the > >> payload is a good idea IMHO. > >> > >> @Adam: can you update the KIP accordingly? > >> > >> - add the optimization to not send a reply from RHS to LHS on > >> unsubscribe (if not a tombstone) > >> - explain why using offsets to avoid duplicates does not work > >> - add hashing to avoid duplicates > >> > >> Beside this, I don't have any further comments. Excited to finally get > >> this in! > >> > >> Let us know when you have updated the KIP so we can move forward with > >> the VOTE. Thanks a lot for your patience! This was a very loooong shot! > >> > >> > >> -Matthias > >> > >> On 3/8/19 8:47 AM, John Roesler wrote: > >>> Hi all, > >>> > >>> This proposal sounds good to me, especially since we observe that > people > >>> are already confused when the see duplicate results coming out of 1:1 > >> joins > >>> (which is a bug). I take this as "evidence" that we're better off > >>> eliminating those duplicates from the start. Guozhang's proposal seems > >> like > >>> a lightweight solution to the problem, so FWIW, I'm in favor. > >>> > >>> Thanks, > >>> -John > >>> > >>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare < > adam.bellem...@gmail.com> > >>> wrote: > >>> > >>>> Hi Guozhang > >>>> > >>>> That would certainly work for eliminating those duplicate values. As > it > >>>> stands right now, this would be consistent with swallowing changes due > >> to > >>>> out-of-order processing with multiple threads, and seems like a very > >>>> reasonable way forward. Thank you for the suggestion! > >>>> > >>>> I have been trying to think if there are any other scenarios where we > >> can > >>>> end up with duplicates, though I have not been able to identify any > >> others > >>>> at the moment. I will think on it a bit more, but if anyone else has > any > >>>> ideas, please chime in. > >>>> > >>>> Thanks, > >>>> Adam > >>>> > >>>> > >>>> > >>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>>> > >>>>> One more thought regarding *c-P2: Duplicates)*: first I want to > >> separate > >>>>> this issue with the more general issue that today (not only > >> foreign-key, > >>>>> but also co-partition primary-key) table-table joins is still not > >>>> strictly > >>>>> respecting the timestamp ordering since the two changelog streams may > >> be > >>>>> fetched and hence processed out-of-order and we do not allow a record > >> to > >>>> be > >>>>> joined with the other table at any given time snapshot yet. So > ideally > >>>> when > >>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) > >> coming > >>>>> at the left hand table and one record (f-k1, v3) at the right hand > >> table, > >>>>> depending on the processing ordering we may get: > >>>>> > >>>>> (k1, (f-k1, v2-v3)) > >>>>> > >>>>> or > >>>>> > >>>>> (k1, (f-k1, v1-v3)) > >>>>> (k1, (f-k1, v2-v3)) > >>>>> > >>>>> And this is not to be addressed by this KIP. > >>>>> > >>>>> What I would advocate is to fix the issue that is introduced in this > >> KIP > >>>>> alone, that is we may have > >>>>> > >>>>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 > >>>>> (k1, (f-k1, v2-v3)) > >>>>> > >>>>> I admit that it does not have correctness issue from the semantics > >> along, > >>>>> comparing it with "discarding the first result", but it may be > >> confusing > >>>>> from user's observation who do not expect to see the seemingly > >>>> duplicates. > >>>>> On the other hand, I think there's a light solution to avoid it, > which > >> is > >>>>> that we can still optimize away to not send the full payload of "v1" > >> from > >>>>> left hand side to right hand side, but instead of just trimming off > the > >>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using > MD5 > >>>>> here just as an example, we can definitely replace it with other > >>>>> functions), by doing which we can discard the join operation if the > >> hash > >>>>> value sent back from the right hand side does not match with the left > >>>> hand > >>>>> side any more, i.e. we will only send: > >>>>> > >>>>> (k1, (f-k1, v2-v3)) > >>>>> > >>>>> to down streams once. > >>>>> > >>>>> WDYT? > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare < > >> adam.bellem...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Ah yes, I recall it all now. That answers that question as to why I > >> had > >>>>>> caching disabled. I can certainly re-enable it since I believe the > >> main > >>>>>> concern was simply about reconciling those two iterators. A lack of > >>>>>> knowledge there on my part. > >>>>>> > >>>>>> > >>>>>> Thank you John for weighing in - we certainly both do appreciate > it. I > >>>>>> think that John hits it on the head though with his comment of "If > it > >>>>> turns > >>>>>> out we're wrong about this, then it should be possible to fix the > >>>>> semantics > >>>>>> in place, without messing with the API." > >>>>>> > >>>>>> If anyone else would like to weigh in, your thoughts would be > greatly > >>>>>> appreciated. > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax < > matth...@confluent.io > >>> > >>>>>> wrote: > >>>>>> > >>>>>>>>> I dont know how to range scan over a caching store, probably one > >>>> had > >>>>>>>>> to open 2 iterators and merge them. > >>>>>>> > >>>>>>> That happens automatically. If you query a cached KTable, it ranges > >>>>> over > >>>>>>> the cache and the underlying RocksDB and performs the merging under > >>>> the > >>>>>>> hood. > >>>>>>> > >>>>>>>>> Other than that, I still think even the regualr join is broken > >>>> with > >>>>>>>>> caching enabled right? > >>>>>>> > >>>>>>> Why? To me, if you use the word "broker", it implies conceptually > >>>>>>> incorrect; I don't see this. > >>>>>>> > >>>>>>>> I once files a ticket, because with caching > >>>>>>>>>> enabled it would return values that havent been published > >>>>> downstream > >>>>>>> yet. > >>>>>>> > >>>>>>> For the bug report, if found > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need to > >>>> fix > >>>>>>> this, but it is a regular bug as any other, and we should not > change > >>>> a > >>>>>>> design because of a bug. > >>>>>>> > >>>>>>> That range() returns values that have not been published downstream > >>>> if > >>>>>>> caching is enabled is how caching works and is intended behavior. > Not > >>>>>>> sure why say it's incorrect? > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: > >>>>>>>> > >>>>>>>> > >>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: > >>>>>>>>> Thanks Adam, > >>>>>>>>> > >>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there is > >>>> no > >>>>>>>>> functional/correctness requirement to disable caching. I cannot > >>>>>>>>> remember why Jan's proposal added this? It might be an > >>>>>>>>> implementation detail though (maybe just remove it from the KIP? > >>>>>>>>> -- might be miss leading). > >>>>>>>> > >>>>>>>> I dont know how to range scan over a caching store, probably one > >>>> had > >>>>>>>> to open 2 iterators and merge them. > >>>>>>>> > >>>>>>>> Other than that, I still think even the regualr join is broken > with > >>>>>>>> caching enabled right? I once files a ticket, because with caching > >>>>>>>> enabled it would return values that havent been published > >>>> downstream > >>>>>> yet. > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >> > >> > > > >