I'd agree with Adam as well: we could consider optimizations leveraging headers as a separate topic that applies more broadly in the future.
On Mon, Mar 11, 2019 at 11:29 AM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi John > > Thanks for the explanation. I wasn't sure how KTable repartition topics > were handled with regards to cleanup but I just wanted to double check to > see if it could cause an issue. > > @Matthias > My inclination is to keep the DSL topologies consistent with one another. I > am a bit concerned about scope creep into the header domain, and I am not > sure how much performance would be improved vs. additional complexity. I > think if we go down this approach we should consider a new type of internal > topic so that it's not confused with existing repartition and changelog > topic types. I am more inclined to go with keeping it consistent and > separated into a normal repartition topic and a normal changelog topic > otherwise. > > Thanks > Adam > > > > > > > On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > I guess Adam suggests, to use compaction for the repartition topic and > > don't purge data. Doing this, would allow us to avoid a store changelog > > topic for the "subscription store" on the RHS. This would be a nice > > optimization. > > > > But the concern about breaking compaction is correct. However, I see it > > as an optimization only and thus, if we keep the topic as plain > > repartition topic and use a separate store changelog topic the issue > > resolves itself. > > > > Maybe we could use headers thought to get this optimization. Do you > > think it's worth to do this optimization or just stick with the simple > > design and two topics (repartition plus changelog)? > > > > > > > > @Adam: thanks for updating the Wiki page. LGTM. > > > > > > -Matthias > > > > > > On 3/11/19 9:24 AM, John Roesler wrote: > > > Hey Adam, > > > > > > That's a good observation, but it wouldn't be a problem for repartition > > > topics because Streams aggressively deletes messages from the > reparation > > > topics once it knows they are handled. Thus, we don't need to try and > > cater > > > to the log compactor. > > > > > > Thanks, > > > -John > > > > > > On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare < > adam.bellem...@gmail.com > > > > > > wrote: > > > > > >> For the sake of expediency, I updated the KIP with what I believe we > > have > > >> discussed. > > >> > > >> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges > > >> > > >> > > >> > > >> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare < > > adam.bellem...@gmail.com> > > >> wrote: > > >> > > >>> My only concern was around compaction of records in the repartition > > >> topic. > > >>> This would simply mean that these records would stick around as their > > >> value > > >>> isn't truly null. Though I know about the usage of compaction on > > >> changelog > > >>> topics, I am a bit fuzzy on where we use compaction in other internal > > >>> topics. So long as this doesn't cause concern I can certainly finish > > off > > >>> the KIP today. > > >>> > > >>> Thanks > > >>> > > >>> Adam > > >>> > > >>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax < > > matth...@confluent.io> > > >>> wrote: > > >>> > > >>>> I agree that the LHS side must encode this information and tell the > > RHS > > >>>> if a tombstone requires a reply or not. > > >>>> > > >>>>>> Would this pose some sort of verbosity problem in the internal > > >> topics, > > >>>>>> especially if we have to rebuild state off of them? > > >>>> > > >>>> I don't see an issue atm. Can you elaborate how this relates to > > rebuild > > >>>> state? > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> On 3/10/19 12:25 PM, Adam Bellemare wrote: > > >>>>> Hi Matthias > > >>>>> > > >>>>> I have been mulling over the unsubscribe / delete optimization, > and I > > >>>> have > > >>>>> one main concern. I believe that the RHS can only determine whether > > to > > >>>>> propagate the tombstone or not based on the value passed over from > > the > > >>>> LHS. > > >>>>> This value would need to be non-null, and so wouldn't the internal > > >>>>> repartition topics end up containing many non-null "tombstone" > > values? > > >>>>> > > >>>>> ie: > > >>>>> Normal tombstone (propagate): (key=123, value=null) > > >>>>> Don't-propagate-tombstone: (key=123, value=("don't > propagate > > >>>> me, > > >>>>> but please delete state")) > > >>>>> > > >>>>> Would this pose some sort of verbosity problem in the internal > > topics, > > >>>>> especially if we have to rebuild state off of them? > > >>>>> > > >>>>> Thanks > > >>>>> > > >>>>> Adam > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax < > > >> matth...@confluent.io> > > >>>>> wrote: > > >>>>> > > >>>>>> SGTM. > > >>>>>> > > >>>>>> I also had the impression that those duplicates are rather an > error > > >>>> than > > >>>>>> an case of eventual consistency. Using hashing to avoid sending > the > > >>>>>> payload is a good idea IMHO. > > >>>>>> > > >>>>>> @Adam: can you update the KIP accordingly? > > >>>>>> > > >>>>>> - add the optimization to not send a reply from RHS to LHS on > > >>>>>> unsubscribe (if not a tombstone) > > >>>>>> - explain why using offsets to avoid duplicates does not work > > >>>>>> - add hashing to avoid duplicates > > >>>>>> > > >>>>>> Beside this, I don't have any further comments. Excited to finally > > >> get > > >>>>>> this in! > > >>>>>> > > >>>>>> Let us know when you have updated the KIP so we can move forward > > with > > >>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong > > >> shot! > > >>>>>> > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> On 3/8/19 8:47 AM, John Roesler wrote: > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> This proposal sounds good to me, especially since we observe that > > >>>> people > > >>>>>>> are already confused when the see duplicate results coming out of > > >> 1:1 > > >>>>>> joins > > >>>>>>> (which is a bug). I take this as "evidence" that we're better off > > >>>>>>> eliminating those duplicates from the start. Guozhang's proposal > > >> seems > > >>>>>> like > > >>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor. > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> -John > > >>>>>>> > > >>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare < > > >>>> adam.bellem...@gmail.com> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Guozhang > > >>>>>>>> > > >>>>>>>> That would certainly work for eliminating those duplicate > values. > > >> As > > >>>> it > > >>>>>>>> stands right now, this would be consistent with swallowing > changes > > >>>> due > > >>>>>> to > > >>>>>>>> out-of-order processing with multiple threads, and seems like a > > >> very > > >>>>>>>> reasonable way forward. Thank you for the suggestion! > > >>>>>>>> > > >>>>>>>> I have been trying to think if there are any other scenarios > where > > >> we > > >>>>>> can > > >>>>>>>> end up with duplicates, though I have not been able to identify > > any > > >>>>>> others > > >>>>>>>> at the moment. I will think on it a bit more, but if anyone else > > >> has > > >>>> any > > >>>>>>>> ideas, please chime in. > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> Adam > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang < > wangg...@gmail.com> > > >>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to > > >>>>>> separate > > >>>>>>>>> this issue with the more general issue that today (not only > > >>>>>> foreign-key, > > >>>>>>>>> but also co-partition primary-key) table-table joins is still > not > > >>>>>>>> strictly > > >>>>>>>>> respecting the timestamp ordering since the two changelog > streams > > >>>> may > > >>>>>> be > > >>>>>>>>> fetched and hence processed out-of-order and we do not allow a > > >>>> record > > >>>>>> to > > >>>>>>>> be > > >>>>>>>>> joined with the other table at any given time snapshot yet. So > > >>>> ideally > > >>>>>>>> when > > >>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, > > v2)) > > >>>>>> coming > > >>>>>>>>> at the left hand table and one record (f-k1, v3) at the right > > hand > > >>>>>> table, > > >>>>>>>>> depending on the processing ordering we may get: > > >>>>>>>>> > > >>>>>>>>> (k1, (f-k1, v2-v3)) > > >>>>>>>>> > > >>>>>>>>> or > > >>>>>>>>> > > >>>>>>>>> (k1, (f-k1, v1-v3)) > > >>>>>>>>> (k1, (f-k1, v2-v3)) > > >>>>>>>>> > > >>>>>>>>> And this is not to be addressed by this KIP. > > >>>>>>>>> > > >>>>>>>>> What I would advocate is to fix the issue that is introduced in > > >> this > > >>>>>> KIP > > >>>>>>>>> alone, that is we may have > > >>>>>>>>> > > >>>>>>>>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 > > >>>>>>>>> (k1, (f-k1, v2-v3)) > > >>>>>>>>> > > >>>>>>>>> I admit that it does not have correctness issue from the > > semantics > > >>>>>> along, > > >>>>>>>>> comparing it with "discarding the first result", but it may be > > >>>>>> confusing > > >>>>>>>>> from user's observation who do not expect to see the seemingly > > >>>>>>>> duplicates. > > >>>>>>>>> On the other hand, I think there's a light solution to avoid > it, > > >>>> which > > >>>>>> is > > >>>>>>>>> that we can still optimize away to not send the full payload of > > >> "v1" > > >>>>>> from > > >>>>>>>>> left hand side to right hand side, but instead of just trimming > > >> off > > >>>> the > > >>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm > > >> using > > >>>> MD5 > > >>>>>>>>> here just as an example, we can definitely replace it with > other > > >>>>>>>>> functions), by doing which we can discard the join operation if > > >> the > > >>>>>> hash > > >>>>>>>>> value sent back from the right hand side does not match with > the > > >>>> left > > >>>>>>>> hand > > >>>>>>>>> side any more, i.e. we will only send: > > >>>>>>>>> > > >>>>>>>>> (k1, (f-k1, v2-v3)) > > >>>>>>>>> > > >>>>>>>>> to down streams once. > > >>>>>>>>> > > >>>>>>>>> WDYT? > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Guozhang > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare < > > >>>>>> adam.bellem...@gmail.com> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Ah yes, I recall it all now. That answers that question as to > > >> why I > > >>>>>> had > > >>>>>>>>>> caching disabled. I can certainly re-enable it since I believe > > >> the > > >>>>>> main > > >>>>>>>>>> concern was simply about reconciling those two iterators. A > lack > > >> of > > >>>>>>>>>> knowledge there on my part. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Thank you John for weighing in - we certainly both do > appreciate > > >>>> it. I > > >>>>>>>>>> think that John hits it on the head though with his comment of > > >> "If > > >>>> it > > >>>>>>>>> turns > > >>>>>>>>>> out we're wrong about this, then it should be possible to fix > > the > > >>>>>>>>> semantics > > >>>>>>>>>> in place, without messing with the API." > > >>>>>>>>>> > > >>>>>>>>>> If anyone else would like to weigh in, your thoughts would be > > >>>> greatly > > >>>>>>>>>> appreciated. > > >>>>>>>>>> > > >>>>>>>>>> Thanks > > >>>>>>>>>> > > >>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax < > > >>>> matth...@confluent.io > > >>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>>>> I dont know how to range scan over a caching store, > probably > > >> one > > >>>>>>>> had > > >>>>>>>>>>>>> to open 2 iterators and merge them. > > >>>>>>>>>>> > > >>>>>>>>>>> That happens automatically. If you query a cached KTable, it > > >>>> ranges > > >>>>>>>>> over > > >>>>>>>>>>> the cache and the underlying RocksDB and performs the merging > > >>>> under > > >>>>>>>> the > > >>>>>>>>>>> hood. > > >>>>>>>>>>> > > >>>>>>>>>>>>> Other than that, I still think even the regualr join is > > broken > > >>>>>>>> with > > >>>>>>>>>>>>> caching enabled right? > > >>>>>>>>>>> > > >>>>>>>>>>> Why? To me, if you use the word "broker", it implies > > >> conceptually > > >>>>>>>>>>> incorrect; I don't see this. > > >>>>>>>>>>> > > >>>>>>>>>>>> I once files a ticket, because with caching > > >>>>>>>>>>>>>> enabled it would return values that havent been published > > >>>>>>>>> downstream > > >>>>>>>>>>> yet. > > >>>>>>>>>>> > > >>>>>>>>>>> For the bug report, if found > > >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still > > need > > >>>> to > > >>>>>>>> fix > > >>>>>>>>>>> this, but it is a regular bug as any other, and we should not > > >>>> change > > >>>>>>>> a > > >>>>>>>>>>> design because of a bug. > > >>>>>>>>>>> > > >>>>>>>>>>> That range() returns values that have not been published > > >>>> downstream > > >>>>>>>> if > > >>>>>>>>>>> caching is enabled is how caching works and is intended > > >> behavior. > > >>>> Not > > >>>>>>>>>>> sure why say it's incorrect? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Matthias > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: > > >>>>>>>>>>>>> Thanks Adam, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, > there > > >> is > > >>>>>>>> no > > >>>>>>>>>>>>> functional/correctness requirement to disable caching. I > > >> cannot > > >>>>>>>>>>>>> remember why Jan's proposal added this? It might be an > > >>>>>>>>>>>>> implementation detail though (maybe just remove it from the > > >> KIP? > > >>>>>>>>>>>>> -- might be miss leading). > > >>>>>>>>>>>> > > >>>>>>>>>>>> I dont know how to range scan over a caching store, probably > > >> one > > >>>>>>>> had > > >>>>>>>>>>>> to open 2 iterators and merge them. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Other than that, I still think even the regualr join is > broken > > >>>> with > > >>>>>>>>>>>> caching enabled right? I once files a ticket, because with > > >>>> caching > > >>>>>>>>>>>> enabled it would return values that havent been published > > >>>>>>>> downstream > > >>>>>>>>>> yet. > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> -- Guozhang > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >> > > > > > > > > -- -- Guozhang