I'd agree with Adam as well: we could consider optimizations leveraging
headers as a separate topic that applies more broadly in the future.

On Mon, Mar 11, 2019 at 11:29 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi John
>
> Thanks for the explanation. I wasn't sure how KTable repartition topics
> were handled with regards to cleanup but I just wanted to double check to
> see if it could cause an issue.
>
> @Matthias
> My inclination is to keep the DSL topologies consistent with one another. I
> am a bit concerned about scope creep into the header domain, and I am not
> sure how much performance would be improved vs. additional complexity. I
> think if we go down this approach we should consider a new type of internal
> topic so that it's not confused with existing repartition and changelog
> topic types. I am more inclined to go with keeping it consistent and
> separated into a normal repartition topic and a normal changelog topic
> otherwise.
>
> Thanks
> Adam
>
>
>
>
>
>
> On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I guess Adam suggests, to use compaction for the repartition topic and
> > don't purge data. Doing this, would allow us to avoid a store changelog
> > topic for the "subscription store" on the RHS. This would be a nice
> > optimization.
> >
> > But the concern about breaking compaction is correct. However, I see it
> > as an optimization only and thus, if we keep the topic as plain
> > repartition topic and use a separate store changelog topic the issue
> > resolves itself.
> >
> > Maybe we could use headers thought to get this optimization. Do you
> > think it's worth to do this optimization or just stick with the simple
> > design and two topics (repartition plus changelog)?
> >
> >
> >
> > @Adam: thanks for updating the Wiki page. LGTM.
> >
> >
> > -Matthias
> >
> >
> > On 3/11/19 9:24 AM, John Roesler wrote:
> > > Hey Adam,
> > >
> > > That's a good observation, but it wouldn't be a problem for repartition
> > > topics because Streams aggressively deletes messages from the
> reparation
> > > topics once it knows they are handled. Thus, we don't need to try and
> > cater
> > > to the log compactor.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare <
> adam.bellem...@gmail.com
> > >
> > > wrote:
> > >
> > >> For the sake of expediency, I updated the KIP with what I believe we
> > have
> > >> discussed.
> > >>
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges
> > >>
> > >>
> > >>
> > >> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <
> > adam.bellem...@gmail.com>
> > >> wrote:
> > >>
> > >>> My only concern was around compaction of records in the repartition
> > >> topic.
> > >>> This would simply mean that these records would stick around as their
> > >> value
> > >>> isn't truly null. Though I know about the usage of compaction on
> > >> changelog
> > >>> topics, I am a bit fuzzy on where we use compaction in other internal
> > >>> topics. So long as this doesn't cause concern I can certainly finish
> > off
> > >>> the KIP today.
> > >>>
> > >>> Thanks
> > >>>
> > >>> Adam
> > >>>
> > >>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <
> > matth...@confluent.io>
> > >>> wrote:
> > >>>
> > >>>> I agree that the LHS side must encode this information and tell the
> > RHS
> > >>>> if a tombstone requires a reply or not.
> > >>>>
> > >>>>>> Would this pose some sort of verbosity problem in the internal
> > >> topics,
> > >>>>>> especially if we have to rebuild state off of them?
> > >>>>
> > >>>> I don't see an issue atm. Can you elaborate how this relates to
> > rebuild
> > >>>> state?
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 3/10/19 12:25 PM, Adam Bellemare wrote:
> > >>>>> Hi Matthias
> > >>>>>
> > >>>>> I have been mulling over the unsubscribe / delete optimization,
> and I
> > >>>> have
> > >>>>> one main concern. I believe that the RHS can only determine whether
> > to
> > >>>>> propagate the tombstone or not based on the value passed over from
> > the
> > >>>> LHS.
> > >>>>> This value would need to be non-null, and so wouldn't the internal
> > >>>>> repartition topics end up containing many non-null "tombstone"
> > values?
> > >>>>>
> > >>>>> ie:
> > >>>>> Normal tombstone (propagate):     (key=123, value=null)
> > >>>>> Don't-propagate-tombstone:          (key=123, value=("don't
> propagate
> > >>>> me,
> > >>>>> but please delete state"))
> > >>>>>
> > >>>>> Would this pose some sort of verbosity problem in the internal
> > topics,
> > >>>>> especially if we have to rebuild state off of them?
> > >>>>>
> > >>>>> Thanks
> > >>>>>
> > >>>>> Adam
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <
> > >> matth...@confluent.io>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> SGTM.
> > >>>>>>
> > >>>>>> I also had the impression that those duplicates are rather an
> error
> > >>>> than
> > >>>>>> an case of eventual consistency. Using hashing to avoid sending
> the
> > >>>>>> payload is a good idea IMHO.
> > >>>>>>
> > >>>>>> @Adam: can you update the KIP accordingly?
> > >>>>>>
> > >>>>>>  - add the optimization to not send a reply from RHS to LHS on
> > >>>>>> unsubscribe (if not a tombstone)
> > >>>>>>  - explain why using offsets to avoid duplicates does not work
> > >>>>>>  - add hashing to avoid duplicates
> > >>>>>>
> > >>>>>> Beside this, I don't have any further comments. Excited to finally
> > >> get
> > >>>>>> this in!
> > >>>>>>
> > >>>>>> Let us know when you have updated the KIP so we can move forward
> > with
> > >>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong
> > >> shot!
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>> On 3/8/19 8:47 AM, John Roesler wrote:
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> This proposal sounds good to me, especially since we observe that
> > >>>> people
> > >>>>>>> are already confused when the see duplicate results coming out of
> > >> 1:1
> > >>>>>> joins
> > >>>>>>> (which is a bug). I take this as "evidence" that we're better off
> > >>>>>>> eliminating those duplicates from the start. Guozhang's proposal
> > >> seems
> > >>>>>> like
> > >>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> -John
> > >>>>>>>
> > >>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <
> > >>>> adam.bellem...@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Guozhang
> > >>>>>>>>
> > >>>>>>>> That would certainly work for eliminating those duplicate
> values.
> > >> As
> > >>>> it
> > >>>>>>>> stands right now, this would be consistent with swallowing
> changes
> > >>>> due
> > >>>>>> to
> > >>>>>>>> out-of-order processing with multiple threads, and seems like a
> > >> very
> > >>>>>>>> reasonable way forward. Thank you for the suggestion!
> > >>>>>>>>
> > >>>>>>>> I have been trying to think if there are any other scenarios
> where
> > >> we
> > >>>>>> can
> > >>>>>>>> end up with duplicates, though I have not been able to identify
> > any
> > >>>>>> others
> > >>>>>>>> at the moment. I will think on it a bit more, but if anyone else
> > >> has
> > >>>> any
> > >>>>>>>> ideas, please chime in.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Adam
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <
> wangg...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to
> > >>>>>> separate
> > >>>>>>>>> this issue with the more general issue that today (not only
> > >>>>>> foreign-key,
> > >>>>>>>>> but also co-partition primary-key) table-table joins is still
> not
> > >>>>>>>> strictly
> > >>>>>>>>> respecting the timestamp ordering since the two changelog
> streams
> > >>>> may
> > >>>>>> be
> > >>>>>>>>> fetched and hence processed out-of-order and we do not allow a
> > >>>> record
> > >>>>>> to
> > >>>>>>>> be
> > >>>>>>>>> joined with the other table at any given time snapshot yet. So
> > >>>> ideally
> > >>>>>>>> when
> > >>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1,
> > v2))
> > >>>>>> coming
> > >>>>>>>>> at the left hand table and one record (f-k1, v3) at the right
> > hand
> > >>>>>> table,
> > >>>>>>>>> depending on the processing ordering we may get:
> > >>>>>>>>>
> > >>>>>>>>> (k1, (f-k1, v2-v3))
> > >>>>>>>>>
> > >>>>>>>>> or
> > >>>>>>>>>
> > >>>>>>>>> (k1, (f-k1, v1-v3))
> > >>>>>>>>> (k1, (f-k1, v2-v3))
> > >>>>>>>>>
> > >>>>>>>>> And this is not to be addressed by this KIP.
> > >>>>>>>>>
> > >>>>>>>>> What I would advocate is to fix the issue that is introduced in
> > >> this
> > >>>>>> KIP
> > >>>>>>>>> alone, that is we may have
> > >>>>>>>>>
> > >>>>>>>>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> > >>>>>>>>> (k1, (f-k1, v2-v3))
> > >>>>>>>>>
> > >>>>>>>>> I admit that it does not have correctness issue from the
> > semantics
> > >>>>>> along,
> > >>>>>>>>> comparing it with "discarding the first result", but it may be
> > >>>>>> confusing
> > >>>>>>>>> from user's observation who do not expect to see the seemingly
> > >>>>>>>> duplicates.
> > >>>>>>>>> On the other hand, I think there's a light solution to avoid
> it,
> > >>>> which
> > >>>>>> is
> > >>>>>>>>> that we can still optimize away to not send the full payload of
> > >> "v1"
> > >>>>>> from
> > >>>>>>>>> left hand side to right hand side, but instead of just trimming
> > >> off
> > >>>> the
> > >>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm
> > >> using
> > >>>> MD5
> > >>>>>>>>> here just as an example, we can definitely replace it with
> other
> > >>>>>>>>> functions), by doing which we can discard the join operation if
> > >> the
> > >>>>>> hash
> > >>>>>>>>> value sent back from the right hand side does not match with
> the
> > >>>> left
> > >>>>>>>> hand
> > >>>>>>>>> side any more, i.e. we will only send:
> > >>>>>>>>>
> > >>>>>>>>> (k1, (f-k1, v2-v3))
> > >>>>>>>>>
> > >>>>>>>>> to down streams once.
> > >>>>>>>>>
> > >>>>>>>>> WDYT?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <
> > >>>>>> adam.bellem...@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Ah yes, I recall it all now. That answers that question as to
> > >> why I
> > >>>>>> had
> > >>>>>>>>>> caching disabled. I can certainly re-enable it since I believe
> > >> the
> > >>>>>> main
> > >>>>>>>>>> concern was simply about reconciling those two iterators. A
> lack
> > >> of
> > >>>>>>>>>> knowledge there on my part.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Thank you John for weighing in - we certainly both do
> appreciate
> > >>>> it. I
> > >>>>>>>>>> think that John hits it on the head though with his comment of
> > >> "If
> > >>>> it
> > >>>>>>>>> turns
> > >>>>>>>>>> out we're wrong about this, then it should be possible to fix
> > the
> > >>>>>>>>> semantics
> > >>>>>>>>>> in place, without messing with the API."
> > >>>>>>>>>>
> > >>>>>>>>>> If anyone else would like to weigh in, your thoughts would be
> > >>>> greatly
> > >>>>>>>>>> appreciated.
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks
> > >>>>>>>>>>
> > >>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <
> > >>>> matth...@confluent.io
> > >>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>>>> I dont know how to range scan over a caching store,
> probably
> > >> one
> > >>>>>>>> had
> > >>>>>>>>>>>>> to open 2 iterators and merge them.
> > >>>>>>>>>>>
> > >>>>>>>>>>> That happens automatically. If you query a cached KTable, it
> > >>>> ranges
> > >>>>>>>>> over
> > >>>>>>>>>>> the cache and the underlying RocksDB and performs the merging
> > >>>> under
> > >>>>>>>> the
> > >>>>>>>>>>> hood.
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> Other than that, I still think even the regualr join is
> > broken
> > >>>>>>>> with
> > >>>>>>>>>>>>> caching enabled right?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Why? To me, if you use the word "broker", it implies
> > >> conceptually
> > >>>>>>>>>>> incorrect; I don't see this.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I once files a ticket, because with caching
> > >>>>>>>>>>>>>> enabled it would return values that havent been published
> > >>>>>>>>> downstream
> > >>>>>>>>>>> yet.
> > >>>>>>>>>>>
> > >>>>>>>>>>> For the bug report, if found
> > >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still
> > need
> > >>>> to
> > >>>>>>>> fix
> > >>>>>>>>>>> this, but it is a regular bug as any other, and we should not
> > >>>> change
> > >>>>>>>> a
> > >>>>>>>>>>> design because of a bug.
> > >>>>>>>>>>>
> > >>>>>>>>>>> That range() returns values that have not been published
> > >>>> downstream
> > >>>>>>>> if
> > >>>>>>>>>>> caching is enabled is how caching works and is intended
> > >> behavior.
> > >>>> Not
> > >>>>>>>>>>> sure why say it's incorrect?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Matthias
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
> > >>>>>>>>>>>>> Thanks Adam,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus,
> there
> > >> is
> > >>>>>>>> no
> > >>>>>>>>>>>>> functional/correctness requirement to disable caching. I
> > >> cannot
> > >>>>>>>>>>>>> remember why Jan's proposal added this? It might be an
> > >>>>>>>>>>>>> implementation detail though (maybe just remove it from the
> > >> KIP?
> > >>>>>>>>>>>>> -- might be miss leading).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I dont know how to range scan over a caching store, probably
> > >> one
> > >>>>>>>> had
> > >>>>>>>>>>>> to open 2 iterators and merge them.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Other than that, I still think even the regualr join is
> broken
> > >>>> with
> > >>>>>>>>>>>> caching enabled right? I once files a ticket, because with
> > >>>> caching
> > >>>>>>>>>>>> enabled it would return values that havent been published
> > >>>>>>>> downstream
> > >>>>>>>>>> yet.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> -- Guozhang
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>
> > >
> >
> >
>


-- 
-- Guozhang

Reply via email to