Handling the "changelog" topic for the buffer in the same way as
repartition topics makes sense. Thanks for clearing that up!

- Victoria

On Tue, Jun 6, 2023 at 10:22 AM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

> Good Point Victoria. I just removed the compacted topic mention from the
> KIP. I agree with Burno about using a normal topic and deleting records
> that have been processed.
>
> On Tue, Jun 6, 2023 at 2:28 AM Bruno Cadonna <cado...@apache.org> wrote:
>
> > Hi,
> >
> > another idea that came to my mind. Instead of using a compacted topic,
> > the buffer could use a non-compacted topic and regularly delete records
> > before a given offset as Streams does for repartition topics.
> >
> > Best,
> > Bruno
> >
> > On 05.06.23 21:48, Bruno Cadonna wrote:
> > > Hi Victoria,
> > >
> > > that is a good point!
> > >
> > > I think, the topic needs to be a compacted topic to be able to get rid
> > > of records that are evicted from the buffer. So the key might be
> > > something with the key, the timestamp, and a sequence number to
> > > distinguish between records with the same key and same timestamp.
> > >
> > > Just an idea! Maybe Walker comes up with something better.
> > >
> > > Best,
> > > Bruno
> > >
> > > On 05.06.23 20:38, Victoria Xia wrote:
> > >> Hi Walker,
> > >>
> > >> Thanks for the latest updates! The KIP looks great. Just one question
> > >> about
> > >> the changelog topic for the join buffer: The KIP says "When a failure
> > >> occurs the buffer will try to recover from an OffsetCheckpoint if
> > >> possible.
> > >> If not it will reload the buffer from a compacted change-log topic."
> > This
> > >> is a new changelog topic that will be introduced specifically for the
> > >> join
> > >> buffer, right? Why is the changelog topic compacted? What are the
> keys?
> > I
> > >> am confused because the buffer contains records from the stream-side
> > >> of the
> > >> join, for which multiple records with the same key should be treated
> as
> > >> separate updates will all must be tracked in the buffer, rather than
> > >> updates which replace each other.
> > >>
> > >> Thanks,
> > >> Victoria
> > >>
> > >> On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > >>
> > >>> Hi Walker,
> > >>>
> > >>> Thanks once more for the updates to the KIP!
> > >>>
> > >>> Do you also plan to expose metrics for the buffer?
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>> On 02.06.23 17:16, Walker Carlson wrote:
> > >>>> Hello Bruno,
> > >>>>
> > >>>> I think this covers your questions. Let me know what you think
> > >>>>
> > >>>> 2.
> > >>>> We can use a changelog topic. I think we can treat it like any other
> > >>> store
> > >>>> and recover in the usual manner. Also implementation is on disk
> > >>>>
> > >>>> 3.
> > >>>> The description is in the public interfaces description. I will copy
> > it
> > >>>> into the proposed changes as well.
> > >>>>
> > >>>> This is a bit of an implementation detail that I didn't want to add
> > >>>> into
> > >>>> the kip, but the record will be added to the buffer to keep the
> stream
> > >>> time
> > >>>> consistent, it will just be ejected immediately. If of course if
> this
> > >>>> causes performance issues we will skip this step and track stream
> time
> > >>>> separately. I will update the kip to say that stream time advances
> > >>>> when a
> > >>>> stream record enters the node.
> > >>>>
> > >>>> Also, yes, updated.
> > >>>>
> > >>>> 5.
> > >>>> No there is no difference right now, everything gets processed as it
> > >>> comes
> > >>>> in and tries to find a record for its time stamp.
> > >>>>
> > >>>> Walker
> > >>>>
> > >>>> On Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna <cado...@apache.org>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Walker,
> > >>>>>
> > >>>>> Thanks for the updates!
> > >>>>>
> > >>>>> 2.
> > >>>>> It is still not clear to me how a failure is handled. I do not
> > >>>>> understand what you mean by "recover from an OffsetCheckpoint".
> > >>>>>
> > >>>>> My understanding is that the buffer needs to be replicated into its
> > >>>>> own
> > >>>>> Kafka topic. The input topic is not enough. The offset of a record
> is
> > >>>>> added to the offsets to commit once the record is streamed through
> > the
> > >>>>> subtopology. That means once the record is added to the buffer its
> > >>>>> offset is added to the offsets to commit -- independently of
> > >>>>> whether the
> > >>>>> record was evicted from the buffer and sent to the join node or
> not.
> > >>>>> Now, let's assume the following scenario
> > >>>>> 1. a record is read from the input topic and added to the buffer,
> but
> > >>>>> not evicted to be processed by the join node.
> > >>>>> 2. When the processing of the subtopology finishes the offset of
> the
> > >>>>> record is added to the offsets to commit.
> > >>>>> 3. A commit happens.
> > >>>>> 4. A failure happens
> > >>>>>
> > >>>>> After the failure the buffer is empty but the record will not be
> read
> > >>>>> anymore from the input topic since its offset has been already
> > >>>>> committed. The record is lost.
> > >>>>> One solution to avoid the loss is to recreate the buffer from a
> > >>>>> compacted Kafka topic as we do for suppression buffers. I do not
> > >>>>> think,
> > >>>>> we need any offset checkpoint here since, we keep the buffer in
> > >>>>> memory,
> > >>>>> right? Or do you plan to back the buffer with a persistent store?
> > Even
> > >>>>> in that case, a compacted Kafka topic would be needed.
> > >>>>>
> > >>>>>
> > >>>>> 3.
> > >>>>>    From the KIP it is still not clear to me what happens if a
> > >>>>> record is
> > >>>>> outside of the grace period. I guess the record that falls outside
> of
> > >>>>> the grace period will not be added to the buffer, but will be send
> to
> > >>>>> the join node. Since it is outside of the grace period it will also
> > >>>>> not
> > >>>>> increase stream time and it will not trigger an eviction. Also the
> > >>>>> head
> > >>>>> of the buffer will not contain a record that needs to be evicted
> > since
> > >>>>> the the timestamp of the head record will be within the interval
> > >>>>> stream
> > >>>>> time minus grace period. Is this correct? Please add such a
> > >>>>> description
> > >>>>> to the KIP.
> > >>>>> Furthermore, I think there is a mistake in the text:
> > >>>>> "... will dequeue when the record timestamp is greater than stream
> > >>>>> time
> > >>>>> plus the grace period". I guess that should be "... will dequeue
> when
> > >>>>> the record timestamp is less than (or equal?) stream time minus the
> > >>>>> grace period"
> > >>>>>
> > >>>>>
> > >>>>> 5.
> > >>>>> What is the difference between not setting the grace period and
> > >>>>> setting
> > >>>>> it to zero? If there is a difference, why is there a difference?
> > >>>>>
> > >>>>>
> > >>>>> Best,
> > >>>>> Bruno
> > >>>>>
> > >>>>>
> > >>>>> On 01.06.23 23:58, Walker Carlson wrote:
> > >>>>>> Hey Bruno thanks for the feedback.
> > >>>>>>
> > >>>>>> 1)
> > >>>>>> I will add this to the kip, but stream time only advances as the
> > when
> > >>> the
> > >>>>>> buffer receives a new record.
> > >>>>>>
> > >>>>>> 2)
> > >>>>>> You are correct, I will add a failure section on to the kip. Since
> > >>>>>> the
> > >>>>>> records wont change in the buffer from when they are read from the
> > >>> topic
> > >>>>>> they are replicated already.
> > >>>>>>
> > >>>>>> 3)
> > >>>>>> I see that I'm out voted on the dropping of records thing. We will
> > >>>>>> pass
> > >>>>>> them on and try to join them if possible. This might cause some
> null
> > >>>>>> results, but increasing the table history retention should help
> > that.
> > >>>>>>
> > >>>>>> 4)
> > >>>>>> I can add some on the kip. But its pretty directly adding whatever
> > >>>>>> the
> > >>>>>> grace period is to the latency. I don't see a way around it.
> > >>>>>>
> > >>>>>> Walker
> > >>>>>>
> > >>>>>> On Thu, Jun 1, 2023 at 5:23 AM Bruno Cadonna <cado...@apache.org>
> > >>> wrote:
> > >>>>>>
> > >>>>>>> Hi Walker,
> > >>>>>>>
> > >>>>>>> thanks for the KIP!
> > >>>>>>>
> > >>>>>>> Here my feedback:
> > >>>>>>>
> > >>>>>>> 1.
> > >>>>>>> It is still not clear to me when stream time for the buffer
> > >>>>>>> advances.
> > >>>>>>> What is the event that let the stream time advance? In the
> > >>> discussion, I
> > >>>>>>> do not understand what you mean by "The segment store already has
> > an
> > >>>>>>> observed stream time, we advance based on that. That should only
> > >>> advance
> > >>>>>>> based on records that enter the store." Where does this segment
> > >>>>>>> store
> > >>>>>>> come from? Anyways, I think it would be great to also state how
> > >>>>>>> stream
> > >>>>>>> time advances in the KIP.
> > >>>>>>>
> > >>>>>>> 2.
> > >>>>>>> How does the buffer behave in case of a failure? I think I
> > >>>>>>> understand
> > >>>>>>> that the buffer will use an implementation of
> > >>> TimeOrderedKeyValueBuffer
> > >>>>>>> and therefore the records in the buffer will be replicated to a
> > >>>>>>> topic
> > >>> in
> > >>>>>>> Kafka, but I am not completely sure. Could you elaborate on this
> in
> > >>> the
> > >>>>>>> KIP?
> > >>>>>>>
> > >>>>>>> 3.
> > >>>>>>> I agree with Matthias about dropping late records. We use grace
> > >>> periods
> > >>>>>>> in scenarios where we records are grouped like in windowed
> > >>> aggregations
> > >>>>>>> and windowed joins. The stream buffer you propose does not really
> > >>> group
> > >>>>>>> any records. It rather delays records and reorders them. I am not
> > >>>>>>> sure
> > >>>>>>> if grace period is the right naming/concept to apply here.
> > >>>>>>> Instead of
> > >>>>>>> dropping records that fall outside of the buffer's time interval
> > the
> > >>>>>>> join should skip the buffer and try to join the record
> > >>>>>>> immediately. In
> > >>>>>>> the end, a stream-table join is a unwindowed join, i.e., no
> > grouping
> > >>> is
> > >>>>>>> applied to the records.
> > >>>>>>> What do you and other folks think about this proposal?
> > >>>>>>>
> > >>>>>>> 4.
> > >>>>>>> How does the proposed buffer, affects processing latency? Could
> you
> > >>>>>>> please add some words about this to the KIP?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Bruno
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 31.05.23 01:49, Walker Carlson wrote:
> > >>>>>>>> Thanks for all the additional comments. I will either address
> them
> > >>> here
> > >>>>>>> or
> > >>>>>>>> update the kip accordingly.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> I mentioned a follow kip to add extra features before and in the
> > >>>>>>> responses.
> > >>>>>>>> I will try to briefly summarize what options and optimizations I
> > >>>>>>>> plan
> > >>>>> to
> > >>>>>>>> include. If a concern is not covered in this list I for sure
> talk
> > >>> about
> > >>>>>>> it
> > >>>>>>>> below.
> > >>>>>>>>
> > >>>>>>>> * Allowing non versioned tables to still use the stream buffer
> > >>>>>>>> * Automatically materializing tables instead of forcing the user
> > to
> > >>> do
> > >>>>> it
> > >>>>>>>> * Configurable for in memory buffer
> > >>>>>>>> * Order the records in offset order or in time order
> > >>>>>>>> * Non memory use buffer (offset order, delayed pull from
> stream.)
> > >>>>>>>> * Time synced between stream and table side (maybe)
> > >>>>>>>> * Do not drop late records and process them as they come in
> > >>>>>>>> instead.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> First, Victoria.
> > >>>>>>>>
> > >>>>>>>> 1) (One of your nits covers this, but you are correct it doesn't
> > >>>>>>>> make
> > >>>>>>>> sense. so I removed that part of the example.)
> > >>>>>>>> For those examples with the "bad" join results I said without
> > >>> buffering
> > >>>>>>> the
> > >>>>>>>> stream it would look like that, but that was incomplete. If the
> > >>>>>>>> look
> > >>> up
> > >>>>>>> was
> > >>>>>>>> simply looking at the latest version of the table when the
> stream
> > >>>>> records
> > >>>>>>>> came in then the results were possible. If we are using the
> > >>>>>>>> point in
> > >>>>> time
> > >>>>>>>> lookup that versioned tables let us then you are correct the
> > future
> > >>>>>>> results
> > >>>>>>>> are not possible.
> > >>>>>>>>
> > >>>>>>>> 2) I'll get to this later as Matthias brought up something
> > related.
> > >>>>>>>>
> > >>>>>>>> To your additional thoughts, I agree that we need to call those
> > >>> things
> > >>>>>>> out
> > >>>>>>>> in the documentation. I'm writing up a follow up kip with a lot
> of
> > >>> the
> > >>>>>>>> ideas we have discussed so that we can improve this feature
> beyond
> > >>> the
> > >>>>>>> base
> > >>>>>>>> implementation if it's needed.
> > >>>>>>>>
> > >>>>>>>> I addressed the nits in the kip. I somehow missed the table
> stream
> > >>>>> table
> > >>>>>>>> join processor improvement, it makes your first question make a
> > lot
> > >>>>> more
> > >>>>>>>> sense.  Table history retention is a much cleaner way to
> > >>>>>>>> describe it.
> > >>>>>>>>
> > >>>>>>>> As to your mention of the syncing the time for the table and
> > >>>>>>>> stream.
> > >>>>>>>> Matthias mentioned that as well. I will address both here. I
> > >>>>>>>> plan to
> > >>>>>>> bring
> > >>>>>>>> that up in the future, but for now we will leave it out. I
> > >>>>>>>> suppose it
> > >>>>>>> will
> > >>>>>>>> be more useful after the table history retention is separable
> from
> > >>> the
> > >>>>>>>> table grace period.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> To address Matthias comments.
> > >>>>>>>>
> > >>>>>>>> You are correct by saying the in memory store shouldn't cause
> any
> > >>>>>>> semantic
> > >>>>>>>> concerns. My concern would be more with if we limited the number
> > of
> > >>>>>>> records
> > >>>>>>>> on the buffer and what we would do if we hit said limits,
> > (emitting
> > >>>>> those
> > >>>>>>>> records might be an issue, throwing an error and halting would
> > >>>>>>>> not).
> > >>> I
> > >>>>>>>> think we can leave this discussion to the follow up kip along
> > >>>>>>>> with a
> > >>>>> few
> > >>>>>>>> other options.
> > >>>>>>>>
> > >>>>>>>> I will go through your proposals now.
> > >>>>>>>>
> > >>>>>>>>       - don't support non-versioned KTables
> > >>>>>>>>
> > >>>>>>>> Sure, we can always expand this later on. Will include as part
> > >>>>>>>> of the
> > >>>>> of
> > >>>>>>>> the improvement kip
> > >>>>>>>>
> > >>>>>>>>       - if grace period is added, users need to explicitly
> > >>>>>>>> materialize
> > >>>>> the
> > >>>>>>>> table as version (either directly, or upstream. Upstream only
> > works
> > >>> if
> > >>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914)
> > >>>>>>>>
> > >>>>>>>> again, that works for me for now, if we find a use we can always
> > >>>>>>>> add
> > >>>>>>> later.
> > >>>>>>>>
> > >>>>>>>>       - the table's history retention time must be larger than
> the
> > >>> grace
> > >>>>>>>> period (should be easy to check at runtime, when we build the
> > >>> topology)
> > >>>>>>>>
> > >>>>>>>> agreed
> > >>>>>>>>
> > >>>>>>>>       - because switching from non-versioned to version stores
> > >>>>>>>> is not
> > >>>>>>>> backward compatibly (cf KIP-914), users need to take care of
> this
> > >>>>>>>> themselves, and this also implies that adding grace period is
> not
> > a
> > >>>>>>>> backward compatible change (even only if via indirect means)
> > >>>>>>>>
> > >>>>>>>> sure, this works
> > >>>>>>>>
> > >>>>>>>> As to the dropping of late records, I'm not sure. One one hand I
> > >>>>>>>> like
> > >>>>> not
> > >>>>>>>> dropping things. But on the other I struggle to see how a user
> can
> > >>>>> filter
> > >>>>>>>> out late records that might have incomplete join results. The
> > point
> > >>> in
> > >>>>>>> time
> > >>>>>>>> look up will aggressively expire old data and if new data has
> been
> > >>>>>>> replaced
> > >>>>>>>> it will return null if outside of the retention. This seems like
> > it
> > >>>>> could
> > >>>>>>>> corrupt the integrity of the join output. Seeing that we drop
> late
> > >>>>>>> records
> > >>>>>>>> on the table side as well I would think it makes sense to drop
> > late
> > >>>>>>> records
> > >>>>>>>> on the stream buffer. I could be convinced otherwise I suppose,
> I
> > >>> could
> > >>>>>>> see
> > >>>>>>>> adding this as an option in a follow up kip. It would be very
> > >>>>>>>> easy to
> > >>>>>>>> implement either way. For now unless no one else objects I'm
> > >>>>>>>> going to
> > >>>>>>> stick
> > >>>>>>>> with dropping the records for the sake of getting this kip
> > >>>>>>>> passed. It
> > >>>>> is
> > >>>>>>>> functionally a small change to make and we can update later if
> you
> > >>> feel
> > >>>>>>>> strongly about it.
> > >>>>>>>>
> > >>>>>>>> For the ordering. I have to say that it would be more
> > >>>>>>>> complicated to
> > >>>>>>>> implement it to be in offset order, if the goal it to get as
> > >>>>>>>> many of
> > >>>>> the
> > >>>>>>>> records validly joined as possible. Because we would process as
> > >>> things
> > >>>>>>> left
> > >>>>>>>> the buffer a sufficiency early enough record could hold up
> records
> > >>> that
> > >>>>>>>> would otherwise be valid past the table history retention. To
> fix
> > >>> this
> > >>>>> we
> > >>>>>>>> could process by timestamp then store in a second queue and emit
> > by
> > >>>>>>> offset,
> > >>>>>>>> but that would be a lot more complicated. If we didn't care
> > >>>>>>>> about not
> > >>>>>>>> missing some valid joins we could just have no store and pull
> from
> > >>> the
> > >>>>>>>> topic at a delay only caring about the timestamp of the next
> > >>>>>>>> offset.
> > >>>>> For
> > >>>>>>>> now I want to stick with the timestamp ordering as it makes much
> > >>>>>>>> more
> > >>>>>>> sense
> > >>>>>>>> to me, but would propose we add both of the other options I have
> > >>>>>>>> laid
> > >>>>> out
> > >>>>>>>> here in the follow up kip.
> > >>>>>>>>
> > >>>>>>>> Lastly, I think having an empty store with zero grace period
> > >>>>>>>> would be
> > >>>>>>> super
> > >>>>>>>> simple and not costly, so we might as well make it even if
> nothing
> > >>> gets
> > >>>>>>>> entered.
> > >>>>>>>>
> > >>>>>>>> I hope that address all your concerns,
> > >>>>>>>>
> > >>>>>>>> Walker
> > >>>>>>>>
> > >>>>>>>> On Thu, May 25, 2023 at 9:50 AM Matthias J. Sax <
> mj...@apache.org
> > >
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Walker,
> > >>>>>>>>>
> > >>>>>>>>> thanks for the updates. The KIP itself reads fine (of course
> > >>> Victoria
> > >>>>>>>>> made good comments about some phrases), but there is a couple
> of
> > >>>>> things
> > >>>>>>>>> from your latest reply I don't understand, and that I still
> think
> > >>> need
> > >>>>>>>>> some more discussions.
> > >>>>>>>>>
> > >>>>>>>>> Lukas, asked about in-memory option and `WindowStoreSupplier`
> and
> > >>> you
> > >>>>>>>>> mention "semantic concerns". There should not be any semantic
> > >>>>> difference
> > >>>>>>>>> from the underlying buffer implementation, so I am not sure
> > >>>>>>>>> what you
> > >>>>>>>>> mean here (also the relationship to suppress() is unclear to
> me)?
> > >>> -- I
> > >>>>>>>>> am ok to not make it configurable for now. We can always do it
> > >>>>>>>>> via a
> > >>>>>>>>> follow up KIP, and keep interface changes limited for now.
> > >>>>>>>>>
> > >>>>>>>>> Does it really make sense to allow a grace period if the table
> is
> > >>>>>>>>> non-versioned? You also say: "If table is not materialized it
> > will
> > >>>>>>>>> materialize it as versioned." -- What history retention time
> > would
> > >>> we
> > >>>>>>>>> pick for this case (also asked by Victoria)? Or should we
> > >>>>>>>>> rather not
> > >>>>>>>>> support this and force the user to materialize the table
> > >>>>>>>>> explicitly,
> > >>>>> and
> > >>>>>>>>> thus explicitly picking a history retention time? It's tradeoff
> > >>>>> between
> > >>>>>>>>> usability and guiding uses that there will be a significant
> > impact
> > >>> on
> > >>>>>>>>> disk usage. There is also compatibility concerns: If the table
> is
> > >>> not
> > >>>>>>>>> explicitly materialized in the old program, we would already
> > >>>>>>>>> need to
> > >>>>>>>>> materialize it also in the old program (of course, we would
> use a
> > >>>>>>>>> non-versioned store so far). Thus, if somebody adds a grace
> > >>>>>>>>> period,
> > >>> we
> > >>>>>>>>> cannot just switch the store type, as it would be a breaking
> > >>>>>>>>> change,
> > >>>>>>>>> potentially required an application re-set, or following the
> > >>>>>>>>> upgrade
> > >>>>>>>>> path for versioned state stores, and also changing the program
> to
> > >>>>>>>>> explicitly materialize using a versioned store. Also note, that
> > we
> > >>>>> might
> > >>>>>>>>> not materialize the actual join table, but only an upstream
> > table,
> > >>> and
> > >>>>>>>>> use `ValueGetter` to access the upstream data.
> > >>>>>>>>>
> > >>>>>>>>> To this end, as you already mentioned, history retention of the
> > >>> table
> > >>>>>>>>> should be at least grace period. You proposed to include this
> in
> > a
> > >>>>>>>>> follow up KIP, but I am wondering if it's a fundamental
> > >>>>>>>>> requirement
> > >>>>> and
> > >>>>>>>>> thus we should put a check in place right away and reject an
> > >>>>>>>>> invalid
> > >>>>>>>>> configuration? (It always easier to lift restriction than to
> > >>> introduce
> > >>>>>>>>> them later.) This would also imply that a non-versioned table
> > >>>>>>>>> cannot
> > >>>>> be
> > >>>>>>>>> supported, because it does not have a history retention that is
> > >>> larger
> > >>>>>>>>> than grace period, and maybe also answer the requirement about
> > >>>>>>>>> materialization: as we already always materialize something on
> > the
> > >>>>>>>>> tablet side as non-versioned store right now, it seems
> > >>>>>>>>> difficult to
> > >>>>>>>>> migrate the store to a versioned store. Ie, it might be ok to
> > push
> > >>> the
> > >>>>>>>>> burden onto the user and say: if you start using grace period,
> > you
> > >>>>> also
> > >>>>>>>>> need to manually switch from non-versioned to versioned
> KTables.
> > >>> Doing
> > >>>>>>>>> stuff automatically under the hood if very complex for this
> > >>>>>>>>> case, we
> > >>>>> if
> > >>>>>>>>> we push the burden onto the user, it might be ok to not
> > complicate
> > >>>>> this
> > >>>>>>>>> KIP significantly.
> > >>>>>>>>>
> > >>>>>>>>> To summarize the last two paragraphs, I would propose to:
> > >>>>>>>>>       - don't support non-versioned KTables
> > >>>>>>>>>       - if grace period is added, users need to explicitly
> > >>> materialize
> > >>>>> the
> > >>>>>>>>> table as version (either directly, or upstream. Upstream only
> > >>>>>>>>> works
> > >>> if
> > >>>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914)
> > >>>>>>>>>       - the table's history retention time must be larger than
> > the
> > >>> grace
> > >>>>>>>>> period (should be easy to check at runtime, when we build the
> > >>>>> topology)
> > >>>>>>>>>       - because switching from non-versioned to version stores
> > >>>>>>>>> is not
> > >>>>>>>>> backward compatibly (cf KIP-914), users need to take care of
> this
> > >>>>>>>>> themselves, and this also implies that adding grace period is
> > >>>>>>>>> not a
> > >>>>>>>>> backward compatible change (even only if via indirect means)
> > >>>>>>>>>
> > >>>>>>>>> About dropping late records: wondering if we should never drop
> a
> > >>>>>>>>> stream-side record for a left-join, even if it's late? In
> > general,
> > >>> one
> > >>>>>>>>> thing I observed over the years is, that it's easier to keep
> > stuff
> > >>> and
> > >>>>>>>>> let users filter explicitly downstream (or make it
> configurable),
> > >>>>>>>>> instead of dropping pro-actively, because users have no good
> > >>>>>>>>> way to
> > >>>>>>>>> resurrect record that got already dropped.
> > >>>>>>>>>
> > >>>>>>>>> For ordering, sounds reasonable to me only start with one
> > >>>>>>>>> implementation, and maybe make it configurable as a follow up.
> > >>>>> However,
> > >>>>>>>>> I am wondering if starting with offset order might be the
> better
> > >>>>> option
> > >>>>>>>>> as it seems to align more with what we do so far? So instead of
> > >>>>> storing
> > >>>>>>>>> record ordered by timestamp, we can just store them ordered by
> > >>> offset,
> > >>>>>>>>> and still "poll" from the buffer based on the head records
> > >>> timestamp.
> > >>>>> Or
> > >>>>>>>>> would this complicate the implementation significantly?
> > >>>>>>>>>
> > >>>>>>>>> I also think it's ok to not "sync" stream-time between the
> > >>>>>>>>> table and
> > >>>>> the
> > >>>>>>>>> stream in this KIP, but we should consider doing this as a
> > >>>>>>>>> follow up
> > >>>>>>>>> change (not sure if we would need a KIP or not for a change
> this
> > >>>>> this).
> > >>>>>>>>>
> > >>>>>>>>> About increasing/decreasing grace period: what you describe
> make
> > >>> sense
> > >>>>>>>>> to me. If decreased, the next record would just trigger
> emitting
> > a
> > >>> lot
> > >>>>>>>>> of records, and for increase, the buffer would just need to
> "fill
> > >>> up"
> > >>>>>>>>> again. For reprocessing getting a different result with a
> > >>>>>>>>> different
> > >>>>>>>>> grace period is expected, so that's ok IMHO. -- There seems to
> be
> > >>> one
> > >>>>>>>>> special corner case: grace period zero. For this case, we
> > actually
> > >>>>> don't
> > >>>>>>>>> need any store, and the stream-side could be stateless. I think
> > it
> > >>> can
> > >>>>>>>>> have the same behavior, but if we want to "add / remove" the
> > store
> > >>>>>>>>> dynamically, we need to add specific code for it. For example,
> > >>>>>>>>> even
> > >>> if
> > >>>>>>>>> we start up with a grace period of zero, we would need to check
> > if
> > >>>>> there
> > >>>>>>>>> is a local store, and still emit everything in it, before we
> can
> > >>> ditch
> > >>>>>>>>> the store (not sure if that's even easily done at all). Or: we
> > >>>>>>>>> would
> > >>>>>>>>> need to have a store for _all_ cases, even if grace period is
> > zero
> > >>>>> (the
> > >>>>>>>>> store would be empty all the time though), to avoid super
> complex
> > >>>>> code?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> -Matthias
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 5/25/23 10:53 AM, Lucas Brutschy wrote:
> > >>>>>>>>>> Hi Walker,
> > >>>>>>>>>>
> > >>>>>>>>>> thanks for your responses. That makes sense. I guess there is
> > >>> always
> > >>>>>>>>>> the option to make the implementation more configurable later
> > on,
> > >>> if
> > >>>>>>>>>> users request it. Also thanks for the clarifications. From my
> > >>>>>>>>>> side,
> > >>>>>>>>>> the KIP is good to go.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Lucas
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, May 24, 2023 at 11:54 PM Victoria Xia
> > >>>>>>>>>> <victoria....@confluent.io.invalid> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the updates, Walker! Looks great, though I do
> have a
> > >>>>> couple
> > >>>>>>>>>>> questions about the latest updates:
> > >>>>>>>>>>>
> > >>>>>>>>>>>         1. The new example says that without stream-side
> > >>>>>>>>>>> buffering,
> > >>>>> "ex"
> > >>>>>>> and
> > >>>>>>>>>>>         "fy" are possible join results. How could those join
> > >>> results
> > >>>>>>>>> happen? The
> > >>>>>>>>>>>         example versioned table suggests that table record
> > >>>>>>>>>>> "x" has
> > >>>>>>>>> timestamp 2, and
> > >>>>>>>>>>>         table record "y" has timestamp 3. If stream record
> > >>>>>>>>>>> "e" has
> > >>>>>>>>> timestamp 1,
> > >>>>>>>>>>>         then it can never be joined against record "x", and
> > >>> similarly
> > >>>>> for
> > >>>>>>>>> stream
> > >>>>>>>>>>>         record "f" with timestamp 2 being joined against "y".
> > >>>>>>>>>>>         2. I see in your replies above that "If table is not
> > >>>>>>> materialized it
> > >>>>>>>>>>>         will materialize it as versioned" but I don't see
> this
> > >>> called
> > >>>>> out
> > >>>>>>>>> in the
> > >>>>>>>>>>>         KIP -- seems worth calling out. Also, what will the
> > >>>>>>>>>>> history
> > >>>>>>>>> retention for
> > >>>>>>>>>>>         the versioned table be? Will it be the same as the
> join
> > >>> grace
> > >>>>>>>>> period, or
> > >>>>>>>>>>>         will it be greater?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> And some additional thoughts:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Sounds like there are a few things users should watch out for
> > >>>>>>>>>>> when
> > >>>>>>>>> enabling
> > >>>>>>>>>>> the stream-side buffer:
> > >>>>>>>>>>>
> > >>>>>>>>>>>         - Records will get "stuck" if there are no newer
> > >>>>>>>>>>> records to
> > >>>>>>> advance
> > >>>>>>>>>>>         stream time.
> > >>>>>>>>>>>         - If there are large gaps between the timestamps of
> > >>>>> stream-side
> > >>>>>>>>> records,
> > >>>>>>>>>>>         then it's possible that versioned store history
> > >>>>>>>>>>> retention
> > >>> will
> > >>>>>>> have
> > >>>>>>>>> expired
> > >>>>>>>>>>>         by the time a record is evicted from the join buffer,
> > >>> leading
> > >>>>> to
> > >>>>>>> a
> > >>>>>>>>> join
> > >>>>>>>>>>>         "miss." For example, if the join grace period and
> table
> > >>>>> history
> > >>>>>>>>> retention
> > >>>>>>>>>>>         are both 10, and records come in the order:
> > >>>>>>>>>>>
> > >>>>>>>>>>>         table side t0 with ts=0
> > >>>>>>>>>>>         stream side s1 with ts=1 <-- enters buffer
> > >>>>>>>>>>>         table side t10 with ts=10
> > >>>>>>>>>>>         table side t20 with ts=20
> > >>>>>>>>>>>         stream side s21 with ts=21 <-- evicts record s1 from
> > >>> buffer,
> > >>>>> but
> > >>>>>>>>>>>         versioned store no longer contains data for ts=1 due
> to
> > >>>>> history
> > >>>>>>>>> retention
> > >>>>>>>>>>>         having elapsed
> > >>>>>>>>>>>
> > >>>>>>>>>>>         This will result in the join result (s1, null) even
> > >>>>>>>>>>> though
> > >>> it
> > >>>>>>>>> should've
> > >>>>>>>>>>>         been (s1, t0), due to t0 having been expired from the
> > >>>>> versioned
> > >>>>>>>>> store
> > >>>>>>>>>>>         already.
> > >>>>>>>>>>>         - Out-of-order records from the stream-side will be
> > >>> reordered,
> > >>>>>>> and
> > >>>>>>>>> late
> > >>>>>>>>>>>         records will be dropped.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I don't think any of these are reasons to not go forward with
> > >>>>>>>>>>> this
> > >>>>>>> KIP,
> > >>>>>>>>> but
> > >>>>>>>>>>> it'd be good to call them out in the eventual documentation
> to
> > >>>>>>> decrease
> > >>>>>>>>> the
> > >>>>>>>>>>> chance users get tripped up.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> We could maybe do an improvement later to advance stream
> time
> > >>> from
> > >>>>>>>>> table
> > >>>>>>>>>>> side as well, but that might be debatable as we might get
> more
> > >>> late
> > >>>>>>>>> records.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Yes, the likelihood of late records increases but also the
> > >>>>> likelihood
> > >>>>>>> of
> > >>>>>>>>>>> "join misses" due to versioned store history retention having
> > >>>>> elapsed
> > >>>>>>>>>>> decreases, which feels important for certain use cases.
> Either
> > >>> way,
> > >>>>>>>>> agreed
> > >>>>>>>>>>> that it can be a discussion for the future as incorporating
> > this
> > >>>>> would
> > >>>>>>>>>>> substantially complicate the implementation.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Also a couple nits:
> > >>>>>>>>>>>
> > >>>>>>>>>>>         - The KIP currently says "We recently added versioned
> > >>> tables
> > >>>>>>> which
> > >>>>>>>>> allow
> > >>>>>>>>>>>         the table side of the a join [...] but it is not
> taken
> > >>>>> advantage
> > >>>>>>> of
> > >>>>>>>>> in
> > >>>>>>>>>>>         joins," but this doesn't seem true? If the table of a
> > >>>>>>> stream-table
> > >>>>>>>>> join is
> > >>>>>>>>>>>         versioned, then the DSL's stream-table join processor
> > >>>>>>>>>>> will
> > >>>>>>>>> automatically
> > >>>>>>>>>>>         perform timestamped lookups into the table, in order
> to
> > >>> take
> > >>>>>>>>> advantage of
> > >>>>>>>>>>>         the new timestamp-aware store to provide better join
> > >>>>> semantics.
> > >>>>>>>>>>>         - The KIP mentions "grace period" for versioned
> > >>>>>>>>>>> stores in a
> > >>>>>>> number
> > >>>>>>>>> of
> > >>>>>>>>>>>         places but I think you actually mean "history
> > >>>>>>>>>>> retention"?
> > >>> The
> > >>>>> two
> > >>>>>>>>> happen to
> > >>>>>>>>>>>         be the same today (it is not an option for users to
> > >>> configure
> > >>>>> the
> > >>>>>>>>> two
> > >>>>>>>>>>>         separately) but this need not be true in the future.
> > >>> "History
> > >>>>>>>>> retention"
> > >>>>>>>>>>>         governs how far back in time reads may occur, which
> > >>>>>>>>>>> is the
> > >>>>>>> relevant
> > >>>>>>>>>>>         parameter for performing lookups as part of the
> > >>> stream-table
> > >>>>>>> join.
> > >>>>>>>>> "Grace
> > >>>>>>>>>>>         period" in the context of versioned stores refers to
> > how
> > >>> far
> > >>>>> back
> > >>>>>>>>> in time
> > >>>>>>>>>>>         out-of-order writes may occur, which probably isn't
> > >>> directly
> > >>>>>>>>> relevant for
> > >>>>>>>>>>>         introducing a stream-side buffer, though it's also
> > >>>>>>>>>>> possible
> > >>>>> I've
> > >>>>>>>>> overlooked
> > >>>>>>>>>>>         something. (As a bonus, switching from "table grace
> > >>> period" in
> > >>>>>>> the
> > >>>>>>>>> KIP to
> > >>>>>>>>>>>         "table history retention" also helps to
> > >>>>>>>>>>> clarify/distinguish
> > >>>>> that
> > >>>>>>>>> it's a
> > >>>>>>>>>>>         different parameter from the "join grace period,"
> > >>>>>>>>>>> which I
> > >>>>> could
> > >>>>>>> see
> > >>>>>>>>> being
> > >>>>>>>>>>>         confusing to readers. :) )
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Victoria
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, May 18, 2023 at 1:43 PM Walker Carlson
> > >>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hey all,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for the comments, they gave me a lot to think about.
> > >>>>>>>>>>>> I'll
> > >>>>> try
> > >>>>>>> to
> > >>>>>>>>>>>> address them all inorder. I have made some updates to the
> kip
> > >>>>> related
> > >>>>>>>>> to
> > >>>>>>>>>>>> them, but I mention where below.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Lucas
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Good idea about the example. I added a simple one.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1) I have thought about including options for the underlying
> > >>> buffer
> > >>>>>>>>>>>> configuration. One of which might be adding an in memory
> > >>>>>>>>>>>> option.
> > >>> My
> > >>>>>>>>> biggest
> > >>>>>>>>>>>> concern is about the semantic guarantees. This isn't like
> > >>> suppress
> > >>>>> or
> > >>>>>>>>> with
> > >>>>>>>>>>>> windows where producing incomplete results is repetitively
> > >>>>> harmless.
> > >>>>>>>>> Here
> > >>>>>>>>>>>> we would be possibly producing incorrect results. I also
> would
> > >>> like
> > >>>>>>> to
> > >>>>>>>>> keep
> > >>>>>>>>>>>> the interface changes as simple as I can. Making more than
> > this
> > >>>>>>> change
> > >>>>>>>>> to
> > >>>>>>>>>>>> Joined I feel could make this more complicated than it needs
> > to
> > >>> be.
> > >>>>>>> If
> > >>>>>>>>> we
> > >>>>>>>>>>>> really want to I could see adding a grace() option with a
> > >>>>>>> BufferConifg
> > >>>>>>>>> in
> > >>>>>>>>>>>> there or something, but I would rather not.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 2) The buffer will be independent of if the table is
> > >>>>>>>>>>>> versioned or
> > >>>>>>> not.
> > >>>>>>>>> If
> > >>>>>>>>>>>> table is not materialized it will materialize it as
> > >>>>>>>>>>>> versioned. It
> > >>>>>>> might
> > >>>>>>>>>>>> make sense to do a follow up kip where we force the
> retention
> > >>>>> period
> > >>>>>>>>> of
> > >>>>>>>>>>>> the versioned to be greater than whatever the max of the
> > stream
> > >>>>>>> buffer
> > >>>>>>>>> is.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Victoria
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1) Yes, records will exit in timestamp order not in offset
> > >>>>>>>>>>>> order.
> > >>>>>>>>>>>> 2) Late records will be dropped (Late as out of the grace
> > >>> period).
> > >>>>>>>>>     From my
> > >>>>>>>>>>>> understanding that is the point of a grace period, no?
> Doesn't
> > >>> the
> > >>>>>>> same
> > >>>>>>>>>>>> thing happen with versioned stores?
> > >>>>>>>>>>>> 3) The segment store already has an observed stream time, we
> > >>>>> advance
> > >>>>>>>>> based
> > >>>>>>>>>>>> on that. That should only advance based on records that
> > >>>>>>>>>>>> enter the
> > >>>>>>>>> store. So
> > >>>>>>>>>>>> yes, only stream side records. We could maybe do an
> > improvement
> > >>>>> later
> > >>>>>>>>> to
> > >>>>>>>>>>>> advance stream time from table side as well, but that might
> be
> > >>>>>>>>> debatable as
> > >>>>>>>>>>>> we might get more late records. Anyways I would rather have
> > >>>>>>>>>>>> that
> > >>>>> as a
> > >>>>>>>>>>>> separate discussion.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> in memory option? We can do that, for the buffer I plan to
> use
> > >>> the
> > >>>>>>>>>>>> TimeOrderedKeyValueBuffer interface which already has an in
> > >>> memory
> > >>>>>>>>>>>> implantation, so it would be simple.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I said more in my answer to Lucas's question. The concern I
> > >>>>>>>>>>>> have
> > >>>>> with
> > >>>>>>>>>>>> buffer configs or in memory is complicating the interface.
> > Also
> > >>>>>>>>> semantic
> > >>>>>>>>>>>> guarantees but in memory shouldn't effect that
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1) fixed out of order vs late terminology in the kip.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 2) I was referring to having a stream. So after this kip we
> > can
> > >>>>> have
> > >>>>>>> a
> > >>>>>>>>>>>> buffered stream or a normal one. For the table we can use a
> > >>>>> versioned
> > >>>>>>>>> table
> > >>>>>>>>>>>> or a normal table.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 3 Good call out. I clarified this as "If the table side
> uses a
> > >>>>>>>>> materialized
> > >>>>>>>>>>>> version store, it can store multiple versions of each record
> > >>> within
> > >>>>>>> its
> > >>>>>>>>>>>> defined grace period." and modified the rest of the
> paragraph
> > a
> > >>>>> bit.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 4) I get the preserving off offset ordering, but if the
> > >>>>>>>>>>>> stream is
> > >>>>>>>>> buffered
> > >>>>>>>>>>>> to join on timestamp instead of offset doesn't it already
> seem
> > >>> like
> > >>>>>>> we
> > >>>>>>>>> care
> > >>>>>>>>>>>> more about time in this case?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If we end up adding more options it might make sense to do
> > >>>>>>>>>>>> this.
> > >>>>>>> Maybe
> > >>>>>>>>>>>> offset order processing can be a follow up?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I'll add a section for this in Rejected Alternatives. I
> > >>>>>>>>>>>> think it
> > >>>>>>> makes
> > >>>>>>>>>>>> sense to do something like this but maybe in a follow up.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 5) I hadn't thought about this. I suppose if they changed
> > >>>>>>>>>>>> this in
> > >>>>> an
> > >>>>>>>>>>>> upgrade the next record would either evict a lot of records
> > (if
> > >>> the
> > >>>>>>>>> grace
> > >>>>>>>>>>>> period decreased) or there would be a pause until the new
> > grace
> > >>>>>>> period
> > >>>>>>>>>>>> reached. Increasing is a bit more problematic, especially if
> > >>>>>>>>>>>> the
> > >>>>>>> table
> > >>>>>>>>>>>> grace period and retention time stays the same. If the data
> is
> > >>>>>>>>> reprocessed
> > >>>>>>>>>>>> after a change like that then there would be different
> > results,
> > >>>>> but I
> > >>>>>>>>> feel
> > >>>>>>>>>>>> like that would be expected after such a change.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> What do you think should happen?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hopefully this answers your questions!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Walker
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <
> > >>> mj...@apache.org>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the KIP! Also some question/comments from my
> side:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 10) Notation: you use the term "late data" but I think you
> > >>>>>>>>>>>>> mean
> > >>>>>>>>>>>>> out-of-order. We reserve the term "late" to records that
> > >>>>>>>>>>>>> arrive
> > >>>>>>> after
> > >>>>>>>>>>>>> grace period passed, and thus, "late == out-of-order data
> > that
> > >>> is
> > >>>>>>>>>>>> dropped".
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 20) "There is only one option from the stream side and only
> > >>>>> recently
> > >>>>>>>>> is
> > >>>>>>>>>>>>> there a second option on the table side."
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What are those options? Victoria already asked about the
> > table
> > >>>>> side,
> > >>>>>>>>> but
> > >>>>>>>>>>>>> I am also not sure what option you mean for the stream
> side?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 30) "If the table side uses a materialized version store
> the
> > >>> value
> > >>>>>>> is
> > >>>>>>>>>>>>> the latest by stream time rather than by offset within its
> > >>> defined
> > >>>>>>>>> grace
> > >>>>>>>>>>>>> period."
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The phrase "the value is the latest by stream time" is
> > >>>>>>>>>>>>> confusing
> > >>>>> --
> > >>>>>>> in
> > >>>>>>>>>>>>> the end, a versioned stores multiple versions, not just
> one.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 40) I am also wondering about ordering. In general, KS
> > >>>>>>>>>>>>> tries to
> > >>>>>>>>> preserve
> > >>>>>>>>>>>>> offset-order during processing (with some exception, when
> > >>>>>>>>>>>>> offset
> > >>>>>>> order
> > >>>>>>>>>>>>> preservation is not clearly defined). Given that the
> > >>>>>>>>>>>>> stream-side
> > >>>>>>>>> buffer
> > >>>>>>>>>>>>> is really just a "linear buffer", we could easily preserve
> > >>>>>>>>> offset-order.
> > >>>>>>>>>>>>> But I also see a benefit of re-ordering and emitting
> > >>> out-of-order
> > >>>>>>> data
> > >>>>>>>>>>>>> right away when read (instead of blocking them behind
> > in-order
> > >>>>>>> records
> > >>>>>>>>>>>>> that are not ready yet). -- It might even be a possibility,
> > to
> > >>> let
> > >>>>>>>>> users
> > >>>>>>>>>>>>> pick a emit strategy eg "EmitStrategy.preserveOffsets"
> (name
> > >>> just
> > >>>>> a
> > >>>>>>>>>>>>> placeholder).
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The KIP should explain this in more detail and also discuss
> > >>>>>>> different
> > >>>>>>>>>>>>> options and mention them in "Rejected alternatives" in case
> > we
> > >>>>> don't
> > >>>>>>>>>>>>> want to include them.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 50) What happens when users change the grace period?
> > >>>>>>>>>>>>> Especially,
> > >>>>>>> when
> > >>>>>>>>>>>>> they turn it on/off (but also increasing/decreasing is an
> > >>>>>>> interesting
> > >>>>>>>>>>>>> point)? I think we should try to support this if possible;
> > the
> > >>>>>>>>>>>>> "Compatibility" section needs to cover switching on/off in
> > >>>>>>>>>>>>> more
> > >>>>>>>>> detail.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 5/2/23 2:06 PM, Victoria Xia wrote:
> > >>>>>>>>>>>>>> Cool KIP, Walker! Thanks for sharing this proposal.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> A few clarifications:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1. Is the order that records exit the buffer in
> > >>>>>>>>>>>>>> necessarily the
> > >>>>>>> same
> > >>>>>>>>> as
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>> order that records enter the buffer in, or no? Based on
> the
> > >>>>>>>>> description
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>> the KIP, it sounds like the answer is no, i.e., records
> will
> > >>> exit
> > >>>>>>> the
> > >>>>>>>>>>>>>> buffer in increasing timestamp order, which means that
> > >>>>>>>>>>>>>> they may
> > >>>>> be
> > >>>>>>>>>>>>> ordered
> > >>>>>>>>>>>>>> (even for the same key) compared to the input order.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2. What happens if the join grace period is nonzero, and a
> > >>>>>>>>> stream-side
> > >>>>>>>>>>>>>> record arrives with a timestamp that is older than the
> > >>>>>>>>>>>>>> current
> > >>>>>>> stream
> > >>>>>>>>>>>>> time
> > >>>>>>>>>>>>>> minus the grace period? Will this record trigger a join
> > >>>>>>>>>>>>>> result,
> > >>>>> or
> > >>>>>>>>> will
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>> be dropped? Based on the description for what happens when
> > >>>>>>>>>>>>>> the
> > >>>>> join
> > >>>>>>>>>>>> grace
> > >>>>>>>>>>>>>> period is set to zero, it sounds like the late record will
> > be
> > >>>>>>>>> dropped,
> > >>>>>>>>>>>>> even
> > >>>>>>>>>>>>>> if the join grace period is nonzero. Is that true?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 3. What could cause stream time to advance, for purposes
> of
> > >>>>>>> removing
> > >>>>>>>>>>>>>> records from the join buffer? For example, will new
> records
> > >>>>>>> arriving
> > >>>>>>>>> on
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>> table side of the join cause stream time to advance? From
> > the
> > >>> KIP
> > >>>>>>> it
> > >>>>>>>>>>>>> sounds
> > >>>>>>>>>>>>>> like only stream-side records will advance stream time --
> > >>>>>>>>>>>>>> does
> > >>>>> that
> > >>>>>>>>>>>> mean
> > >>>>>>>>>>>>>> that the join processor itself will have to track this
> > stream
> > >>>>> time?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Also +1 to Lucas's question about what options will be
> > >>> available
> > >>>>>>> for
> > >>>>>>>>>>>>>> configuring the join buffer. Will users have the option to
> > >>> choose
> > >>>>>>>>>>>> whether
> > >>>>>>>>>>>>>> they want the buffer to be in-memory vs persistent?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - Victoria
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
> > >>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> HI Walker,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> thanks for the KIP! We definitely need this. I have two
> > >>>>> questions:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>        - Have you considered allowing the customization
> > >>>>>>>>>>>>>>> of the
> > >>>>>>>>> underlying
> > >>>>>>>>>>>>>>> buffer implementation? As I can see, `StreamJoined` lets
> > you
> > >>>>>>>>> customize
> > >>>>>>>>>>>>>>> the underlying store via a `WindowStoreSupplier`. Would
> it
> > >>> make
> > >>>>>>>>> sense
> > >>>>>>>>>>>>>>> for `Joined` to have this as well? I can imagine one may
> > >>>>>>>>>>>>>>> want
> > >>> to
> > >>>>>>>>> limit
> > >>>>>>>>>>>>>>> the number of records in the buffer, for example. If we
> hit
> > >>> the
> > >>>>>>>>>>>>>>> maximum, the only option would be to drop semantic
> > >>>>>>>>>>>>>>> guarantees,
> > >>>>> but
> > >>>>>>>>>>>>>>> users may still want to do this.
> > >>>>>>>>>>>>>>>        - With "second option on the table side" you are
> > >>> referring
> > >>>>> to
> > >>>>>>>>>>>>>>> versioned tables, right? Will the buffer on the stream
> side
> > >>>>> behave
> > >>>>>>>>> any
> > >>>>>>>>>>>>>>> different whether the table side is versioned or not?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Finally, I think a simple example in the motivation
> section
> > >>>>> could
> > >>>>>>>>> help
> > >>>>>>>>>>>>>>> non-experts understand the KIP.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>> Lucas
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
> > >>>>>>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hello everybody,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I have a stream proposal to improve the stream table
> > >>>>>>>>>>>>>>>> join by
> > >>>>>>>>> adding a
> > >>>>>>>>>>>>>>> grace
> > >>>>>>>>>>>>>>>> period and buffer to the stream side of the join to
> allow
> > >>>>>>>>> processing
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>> timestamp order matching the recent improvements of the
> > >>>>> versioned
> > >>>>>>>>>>>>> tables.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Please take a look here <
> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/x/lAs0Dw>
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> share your thoughts.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> best,
> > >>>>>>>>>>>>>>>> Walker
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
>

Reply via email to