For the sake of expediency, I updated the KIP with what I believe we have
discussed.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges



On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> My only concern was around compaction of records in the repartition topic.
> This would simply mean that these records would stick around as their value
> isn't truly null. Though I know about the usage of compaction on changelog
> topics, I am a bit fuzzy on where we use compaction in other internal
> topics. So long as this doesn't cause concern I can certainly finish off
> the KIP today.
>
> Thanks
>
> Adam
>
> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> I agree that the LHS side must encode this information and tell the RHS
>> if a tombstone requires a reply or not.
>>
>> >> Would this pose some sort of verbosity problem in the internal topics,
>> >> especially if we have to rebuild state off of them?
>>
>> I don't see an issue atm. Can you elaborate how this relates to rebuild
>> state?
>>
>>
>> -Matthias
>>
>> On 3/10/19 12:25 PM, Adam Bellemare wrote:
>> > Hi Matthias
>> >
>> > I have been mulling over the unsubscribe / delete optimization, and I
>> have
>> > one main concern. I believe that the RHS can only determine whether to
>> > propagate the tombstone or not based on the value passed over from the
>> LHS.
>> > This value would need to be non-null, and so wouldn't the internal
>> > repartition topics end up containing many non-null "tombstone" values?
>> >
>> > ie:
>> > Normal tombstone (propagate):     (key=123, value=null)
>> > Don't-propagate-tombstone:          (key=123, value=("don't propagate
>> me,
>> > but please delete state"))
>> >
>> > Would this pose some sort of verbosity problem in the internal topics,
>> > especially if we have to rebuild state off of them?
>> >
>> > Thanks
>> >
>> > Adam
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> >> SGTM.
>> >>
>> >> I also had the impression that those duplicates are rather an error
>> than
>> >> an case of eventual consistency. Using hashing to avoid sending the
>> >> payload is a good idea IMHO.
>> >>
>> >> @Adam: can you update the KIP accordingly?
>> >>
>> >>  - add the optimization to not send a reply from RHS to LHS on
>> >> unsubscribe (if not a tombstone)
>> >>  - explain why using offsets to avoid duplicates does not work
>> >>  - add hashing to avoid duplicates
>> >>
>> >> Beside this, I don't have any further comments. Excited to finally get
>> >> this in!
>> >>
>> >> Let us know when you have updated the KIP so we can move forward with
>> >> the VOTE. Thanks a lot for your patience! This was a very loooong shot!
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 3/8/19 8:47 AM, John Roesler wrote:
>> >>> Hi all,
>> >>>
>> >>> This proposal sounds good to me, especially since we observe that
>> people
>> >>> are already confused when the see duplicate results coming out of 1:1
>> >> joins
>> >>> (which is a bug). I take this as "evidence" that we're better off
>> >>> eliminating those duplicates from the start. Guozhang's proposal seems
>> >> like
>> >>> a lightweight solution to the problem, so FWIW, I'm in favor.
>> >>>
>> >>> Thanks,
>> >>> -John
>> >>>
>> >>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <
>> adam.bellem...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Hi Guozhang
>> >>>>
>> >>>> That would certainly work for eliminating those duplicate values. As
>> it
>> >>>> stands right now, this would be consistent with swallowing changes
>> due
>> >> to
>> >>>> out-of-order processing with multiple threads, and seems like a very
>> >>>> reasonable way forward. Thank you for the suggestion!
>> >>>>
>> >>>> I have been trying to think if there are any other scenarios where we
>> >> can
>> >>>> end up with duplicates, though I have not been able to identify any
>> >> others
>> >>>> at the moment. I will think on it a bit more, but if anyone else has
>> any
>> >>>> ideas, please chime in.
>> >>>>
>> >>>> Thanks,
>> >>>> Adam
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com>
>> >> wrote:
>> >>>>
>> >>>>> One more thought regarding *c-P2: Duplicates)*: first I want to
>> >> separate
>> >>>>> this issue with the more general issue that today (not only
>> >> foreign-key,
>> >>>>> but also co-partition primary-key) table-table joins is still not
>> >>>> strictly
>> >>>>> respecting the timestamp ordering since the two changelog streams
>> may
>> >> be
>> >>>>> fetched and hence processed out-of-order and we do not allow a
>> record
>> >> to
>> >>>> be
>> >>>>> joined with the other table at any given time snapshot yet. So
>> ideally
>> >>>> when
>> >>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2))
>> >> coming
>> >>>>> at the left hand table and one record (f-k1, v3) at the right hand
>> >> table,
>> >>>>> depending on the processing ordering we may get:
>> >>>>>
>> >>>>> (k1, (f-k1, v2-v3))
>> >>>>>
>> >>>>> or
>> >>>>>
>> >>>>> (k1, (f-k1, v1-v3))
>> >>>>> (k1, (f-k1, v2-v3))
>> >>>>>
>> >>>>> And this is not to be addressed by this KIP.
>> >>>>>
>> >>>>> What I would advocate is to fix the issue that is introduced in this
>> >> KIP
>> >>>>> alone, that is we may have
>> >>>>>
>> >>>>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
>> >>>>> (k1, (f-k1, v2-v3))
>> >>>>>
>> >>>>> I admit that it does not have correctness issue from the semantics
>> >> along,
>> >>>>> comparing it with "discarding the first result", but it may be
>> >> confusing
>> >>>>> from user's observation who do not expect to see the seemingly
>> >>>> duplicates.
>> >>>>> On the other hand, I think there's a light solution to avoid it,
>> which
>> >> is
>> >>>>> that we can still optimize away to not send the full payload of "v1"
>> >> from
>> >>>>> left hand side to right hand side, but instead of just trimming off
>> the
>> >>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using
>> MD5
>> >>>>> here just as an example, we can definitely replace it with other
>> >>>>> functions), by doing which we can discard the join operation if the
>> >> hash
>> >>>>> value sent back from the right hand side does not match with the
>> left
>> >>>> hand
>> >>>>> side any more, i.e. we will only send:
>> >>>>>
>> >>>>> (k1, (f-k1, v2-v3))
>> >>>>>
>> >>>>> to down streams once.
>> >>>>>
>> >>>>> WDYT?
>> >>>>>
>> >>>>>
>> >>>>> Guozhang
>> >>>>>
>> >>>>>
>> >>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <
>> >> adam.bellem...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Ah yes, I recall it all now. That answers that question as to why I
>> >> had
>> >>>>>> caching disabled. I can certainly re-enable it since I believe the
>> >> main
>> >>>>>> concern was simply about reconciling those two iterators. A lack of
>> >>>>>> knowledge there on my part.
>> >>>>>>
>> >>>>>>
>> >>>>>> Thank you John for weighing in - we certainly both do appreciate
>> it. I
>> >>>>>> think that John hits it on the head though with his comment of "If
>> it
>> >>>>> turns
>> >>>>>> out we're wrong about this, then it should be possible to fix the
>> >>>>> semantics
>> >>>>>> in place, without messing with the API."
>> >>>>>>
>> >>>>>> If anyone else would like to weigh in, your thoughts would be
>> greatly
>> >>>>>> appreciated.
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>>
>> >>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <
>> matth...@confluent.io
>> >>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>>>> I dont know how to range scan over a caching store, probably one
>> >>>> had
>> >>>>>>>>> to open 2 iterators and merge them.
>> >>>>>>>
>> >>>>>>> That happens automatically. If you query a cached KTable, it
>> ranges
>> >>>>> over
>> >>>>>>> the cache and the underlying RocksDB and performs the merging
>> under
>> >>>> the
>> >>>>>>> hood.
>> >>>>>>>
>> >>>>>>>>> Other than that, I still think even the regualr join is broken
>> >>>> with
>> >>>>>>>>> caching enabled right?
>> >>>>>>>
>> >>>>>>> Why? To me, if you use the word "broker", it implies conceptually
>> >>>>>>> incorrect; I don't see this.
>> >>>>>>>
>> >>>>>>>> I once files a ticket, because with caching
>> >>>>>>>>>> enabled it would return values that havent been published
>> >>>>> downstream
>> >>>>>>> yet.
>> >>>>>>>
>> >>>>>>> For the bug report, if found
>> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need
>> to
>> >>>> fix
>> >>>>>>> this, but it is a regular bug as any other, and we should not
>> change
>> >>>> a
>> >>>>>>> design because of a bug.
>> >>>>>>>
>> >>>>>>> That range() returns values that have not been published
>> downstream
>> >>>> if
>> >>>>>>> caching is enabled is how caching works and is intended behavior.
>> Not
>> >>>>>>> sure why say it's incorrect?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> -Matthias
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
>> >>>>>>>>> Thanks Adam,
>> >>>>>>>>>
>> >>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there is
>> >>>> no
>> >>>>>>>>> functional/correctness requirement to disable caching. I cannot
>> >>>>>>>>> remember why Jan's proposal added this? It might be an
>> >>>>>>>>> implementation detail though (maybe just remove it from the KIP?
>> >>>>>>>>> -- might be miss leading).
>> >>>>>>>>
>> >>>>>>>> I dont know how to range scan over a caching store, probably one
>> >>>> had
>> >>>>>>>> to open 2 iterators and merge them.
>> >>>>>>>>
>> >>>>>>>> Other than that, I still think even the regualr join is broken
>> with
>> >>>>>>>> caching enabled right? I once files a ticket, because with
>> caching
>> >>>>>>>> enabled it would return values that havent been published
>> >>>> downstream
>> >>>>>> yet.
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>> --
>> >>>>> -- Guozhang
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Reply via email to