Hi John Thanks for the explanation. I wasn't sure how KTable repartition topics were handled with regards to cleanup but I just wanted to double check to see if it could cause an issue.
@Matthias My inclination is to keep the DSL topologies consistent with one another. I am a bit concerned about scope creep into the header domain, and I am not sure how much performance would be improved vs. additional complexity. I think if we go down this approach we should consider a new type of internal topic so that it's not confused with existing repartition and changelog topic types. I am more inclined to go with keeping it consistent and separated into a normal repartition topic and a normal changelog topic otherwise. Thanks Adam On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax <matth...@confluent.io> wrote: > I guess Adam suggests, to use compaction for the repartition topic and > don't purge data. Doing this, would allow us to avoid a store changelog > topic for the "subscription store" on the RHS. This would be a nice > optimization. > > But the concern about breaking compaction is correct. However, I see it > as an optimization only and thus, if we keep the topic as plain > repartition topic and use a separate store changelog topic the issue > resolves itself. > > Maybe we could use headers thought to get this optimization. Do you > think it's worth to do this optimization or just stick with the simple > design and two topics (repartition plus changelog)? > > > > @Adam: thanks for updating the Wiki page. LGTM. > > > -Matthias > > > On 3/11/19 9:24 AM, John Roesler wrote: > > Hey Adam, > > > > That's a good observation, but it wouldn't be a problem for repartition > > topics because Streams aggressively deletes messages from the reparation > > topics once it knows they are handled. Thus, we don't need to try and > cater > > to the log compactor. > > > > Thanks, > > -John > > > > On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare <adam.bellem...@gmail.com > > > > wrote: > > > >> For the sake of expediency, I updated the KIP with what I believe we > have > >> discussed. > >> > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges > >> > >> > >> > >> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare < > adam.bellem...@gmail.com> > >> wrote: > >> > >>> My only concern was around compaction of records in the repartition > >> topic. > >>> This would simply mean that these records would stick around as their > >> value > >>> isn't truly null. Though I know about the usage of compaction on > >> changelog > >>> topics, I am a bit fuzzy on where we use compaction in other internal > >>> topics. So long as this doesn't cause concern I can certainly finish > off > >>> the KIP today. > >>> > >>> Thanks > >>> > >>> Adam > >>> > >>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> I agree that the LHS side must encode this information and tell the > RHS > >>>> if a tombstone requires a reply or not. > >>>> > >>>>>> Would this pose some sort of verbosity problem in the internal > >> topics, > >>>>>> especially if we have to rebuild state off of them? > >>>> > >>>> I don't see an issue atm. Can you elaborate how this relates to > rebuild > >>>> state? > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 3/10/19 12:25 PM, Adam Bellemare wrote: > >>>>> Hi Matthias > >>>>> > >>>>> I have been mulling over the unsubscribe / delete optimization, and I > >>>> have > >>>>> one main concern. I believe that the RHS can only determine whether > to > >>>>> propagate the tombstone or not based on the value passed over from > the > >>>> LHS. > >>>>> This value would need to be non-null, and so wouldn't the internal > >>>>> repartition topics end up containing many non-null "tombstone" > values? > >>>>> > >>>>> ie: > >>>>> Normal tombstone (propagate): (key=123, value=null) > >>>>> Don't-propagate-tombstone: (key=123, value=("don't propagate > >>>> me, > >>>>> but please delete state")) > >>>>> > >>>>> Would this pose some sort of verbosity problem in the internal > topics, > >>>>> especially if we have to rebuild state off of them? > >>>>> > >>>>> Thanks > >>>>> > >>>>> Adam > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> SGTM. > >>>>>> > >>>>>> I also had the impression that those duplicates are rather an error > >>>> than > >>>>>> an case of eventual consistency. Using hashing to avoid sending the > >>>>>> payload is a good idea IMHO. > >>>>>> > >>>>>> @Adam: can you update the KIP accordingly? > >>>>>> > >>>>>> - add the optimization to not send a reply from RHS to LHS on > >>>>>> unsubscribe (if not a tombstone) > >>>>>> - explain why using offsets to avoid duplicates does not work > >>>>>> - add hashing to avoid duplicates > >>>>>> > >>>>>> Beside this, I don't have any further comments. Excited to finally > >> get > >>>>>> this in! > >>>>>> > >>>>>> Let us know when you have updated the KIP so we can move forward > with > >>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong > >> shot! > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 3/8/19 8:47 AM, John Roesler wrote: > >>>>>>> Hi all, > >>>>>>> > >>>>>>> This proposal sounds good to me, especially since we observe that > >>>> people > >>>>>>> are already confused when the see duplicate results coming out of > >> 1:1 > >>>>>> joins > >>>>>>> (which is a bug). I take this as "evidence" that we're better off > >>>>>>> eliminating those duplicates from the start. Guozhang's proposal > >> seems > >>>>>> like > >>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> -John > >>>>>>> > >>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare < > >>>> adam.bellem...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Guozhang > >>>>>>>> > >>>>>>>> That would certainly work for eliminating those duplicate values. > >> As > >>>> it > >>>>>>>> stands right now, this would be consistent with swallowing changes > >>>> due > >>>>>> to > >>>>>>>> out-of-order processing with multiple threads, and seems like a > >> very > >>>>>>>> reasonable way forward. Thank you for the suggestion! > >>>>>>>> > >>>>>>>> I have been trying to think if there are any other scenarios where > >> we > >>>>>> can > >>>>>>>> end up with duplicates, though I have not been able to identify > any > >>>>>> others > >>>>>>>> at the moment. I will think on it a bit more, but if anyone else > >> has > >>>> any > >>>>>>>> ideas, please chime in. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Adam > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to > >>>>>> separate > >>>>>>>>> this issue with the more general issue that today (not only > >>>>>> foreign-key, > >>>>>>>>> but also co-partition primary-key) table-table joins is still not > >>>>>>>> strictly > >>>>>>>>> respecting the timestamp ordering since the two changelog streams > >>>> may > >>>>>> be > >>>>>>>>> fetched and hence processed out-of-order and we do not allow a > >>>> record > >>>>>> to > >>>>>>>> be > >>>>>>>>> joined with the other table at any given time snapshot yet. So > >>>> ideally > >>>>>>>> when > >>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, > v2)) > >>>>>> coming > >>>>>>>>> at the left hand table and one record (f-k1, v3) at the right > hand > >>>>>> table, > >>>>>>>>> depending on the processing ordering we may get: > >>>>>>>>> > >>>>>>>>> (k1, (f-k1, v2-v3)) > >>>>>>>>> > >>>>>>>>> or > >>>>>>>>> > >>>>>>>>> (k1, (f-k1, v1-v3)) > >>>>>>>>> (k1, (f-k1, v2-v3)) > >>>>>>>>> > >>>>>>>>> And this is not to be addressed by this KIP. > >>>>>>>>> > >>>>>>>>> What I would advocate is to fix the issue that is introduced in > >> this > >>>>>> KIP > >>>>>>>>> alone, that is we may have > >>>>>>>>> > >>>>>>>>> (k1, (f-k1, v2-v3)) // this should actually be v1-v3 > >>>>>>>>> (k1, (f-k1, v2-v3)) > >>>>>>>>> > >>>>>>>>> I admit that it does not have correctness issue from the > semantics > >>>>>> along, > >>>>>>>>> comparing it with "discarding the first result", but it may be > >>>>>> confusing > >>>>>>>>> from user's observation who do not expect to see the seemingly > >>>>>>>> duplicates. > >>>>>>>>> On the other hand, I think there's a light solution to avoid it, > >>>> which > >>>>>> is > >>>>>>>>> that we can still optimize away to not send the full payload of > >> "v1" > >>>>>> from > >>>>>>>>> left hand side to right hand side, but instead of just trimming > >> off > >>>> the > >>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm > >> using > >>>> MD5 > >>>>>>>>> here just as an example, we can definitely replace it with other > >>>>>>>>> functions), by doing which we can discard the join operation if > >> the > >>>>>> hash > >>>>>>>>> value sent back from the right hand side does not match with the > >>>> left > >>>>>>>> hand > >>>>>>>>> side any more, i.e. we will only send: > >>>>>>>>> > >>>>>>>>> (k1, (f-k1, v2-v3)) > >>>>>>>>> > >>>>>>>>> to down streams once. > >>>>>>>>> > >>>>>>>>> WDYT? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare < > >>>>>> adam.bellem...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Ah yes, I recall it all now. That answers that question as to > >> why I > >>>>>> had > >>>>>>>>>> caching disabled. I can certainly re-enable it since I believe > >> the > >>>>>> main > >>>>>>>>>> concern was simply about reconciling those two iterators. A lack > >> of > >>>>>>>>>> knowledge there on my part. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thank you John for weighing in - we certainly both do appreciate > >>>> it. I > >>>>>>>>>> think that John hits it on the head though with his comment of > >> "If > >>>> it > >>>>>>>>> turns > >>>>>>>>>> out we're wrong about this, then it should be possible to fix > the > >>>>>>>>> semantics > >>>>>>>>>> in place, without messing with the API." > >>>>>>>>>> > >>>>>>>>>> If anyone else would like to weigh in, your thoughts would be > >>>> greatly > >>>>>>>>>> appreciated. > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> > >>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax < > >>>> matth...@confluent.io > >>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>>>> I dont know how to range scan over a caching store, probably > >> one > >>>>>>>> had > >>>>>>>>>>>>> to open 2 iterators and merge them. > >>>>>>>>>>> > >>>>>>>>>>> That happens automatically. If you query a cached KTable, it > >>>> ranges > >>>>>>>>> over > >>>>>>>>>>> the cache and the underlying RocksDB and performs the merging > >>>> under > >>>>>>>> the > >>>>>>>>>>> hood. > >>>>>>>>>>> > >>>>>>>>>>>>> Other than that, I still think even the regualr join is > broken > >>>>>>>> with > >>>>>>>>>>>>> caching enabled right? > >>>>>>>>>>> > >>>>>>>>>>> Why? To me, if you use the word "broker", it implies > >> conceptually > >>>>>>>>>>> incorrect; I don't see this. > >>>>>>>>>>> > >>>>>>>>>>>> I once files a ticket, because with caching > >>>>>>>>>>>>>> enabled it would return values that havent been published > >>>>>>>>> downstream > >>>>>>>>>>> yet. > >>>>>>>>>>> > >>>>>>>>>>> For the bug report, if found > >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still > need > >>>> to > >>>>>>>> fix > >>>>>>>>>>> this, but it is a regular bug as any other, and we should not > >>>> change > >>>>>>>> a > >>>>>>>>>>> design because of a bug. > >>>>>>>>>>> > >>>>>>>>>>> That range() returns values that have not been published > >>>> downstream > >>>>>>>> if > >>>>>>>>>>> caching is enabled is how caching works and is intended > >> behavior. > >>>> Not > >>>>>>>>>>> sure why say it's incorrect? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote: > >>>>>>>>>>>>> Thanks Adam, > >>>>>>>>>>>>> > >>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there > >> is > >>>>>>>> no > >>>>>>>>>>>>> functional/correctness requirement to disable caching. I > >> cannot > >>>>>>>>>>>>> remember why Jan's proposal added this? It might be an > >>>>>>>>>>>>> implementation detail though (maybe just remove it from the > >> KIP? > >>>>>>>>>>>>> -- might be miss leading). > >>>>>>>>>>>> > >>>>>>>>>>>> I dont know how to range scan over a caching store, probably > >> one > >>>>>>>> had > >>>>>>>>>>>> to open 2 iterators and merge them. > >>>>>>>>>>>> > >>>>>>>>>>>> Other than that, I still think even the regualr join is broken > >>>> with > >>>>>>>>>>>> caching enabled right? I once files a ticket, because with > >>>> caching > >>>>>>>>>>>> enabled it would return values that havent been published > >>>>>>>> downstream > >>>>>>>>>> yet. > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >> > > > >