Handling the "changelog" topic for the buffer in the same way as repartition topics makes sense. Thanks for clearing that up!
- Victoria On Tue, Jun 6, 2023 at 10:22 AM Walker Carlson <wcarl...@confluent.io.invalid> wrote: > Good Point Victoria. I just removed the compacted topic mention from the > KIP. I agree with Burno about using a normal topic and deleting records > that have been processed. > > On Tue, Jun 6, 2023 at 2:28 AM Bruno Cadonna <cado...@apache.org> wrote: > > > Hi, > > > > another idea that came to my mind. Instead of using a compacted topic, > > the buffer could use a non-compacted topic and regularly delete records > > before a given offset as Streams does for repartition topics. > > > > Best, > > Bruno > > > > On 05.06.23 21:48, Bruno Cadonna wrote: > > > Hi Victoria, > > > > > > that is a good point! > > > > > > I think, the topic needs to be a compacted topic to be able to get rid > > > of records that are evicted from the buffer. So the key might be > > > something with the key, the timestamp, and a sequence number to > > > distinguish between records with the same key and same timestamp. > > > > > > Just an idea! Maybe Walker comes up with something better. > > > > > > Best, > > > Bruno > > > > > > On 05.06.23 20:38, Victoria Xia wrote: > > >> Hi Walker, > > >> > > >> Thanks for the latest updates! The KIP looks great. Just one question > > >> about > > >> the changelog topic for the join buffer: The KIP says "When a failure > > >> occurs the buffer will try to recover from an OffsetCheckpoint if > > >> possible. > > >> If not it will reload the buffer from a compacted change-log topic." > > This > > >> is a new changelog topic that will be introduced specifically for the > > >> join > > >> buffer, right? Why is the changelog topic compacted? What are the > keys? > > I > > >> am confused because the buffer contains records from the stream-side > > >> of the > > >> join, for which multiple records with the same key should be treated > as > > >> separate updates will all must be tracked in the buffer, rather than > > >> updates which replace each other. > > >> > > >> Thanks, > > >> Victoria > > >> > > >> On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna <cado...@apache.org> > > wrote: > > >> > > >>> Hi Walker, > > >>> > > >>> Thanks once more for the updates to the KIP! > > >>> > > >>> Do you also plan to expose metrics for the buffer? > > >>> > > >>> Best, > > >>> Bruno > > >>> > > >>> On 02.06.23 17:16, Walker Carlson wrote: > > >>>> Hello Bruno, > > >>>> > > >>>> I think this covers your questions. Let me know what you think > > >>>> > > >>>> 2. > > >>>> We can use a changelog topic. I think we can treat it like any other > > >>> store > > >>>> and recover in the usual manner. Also implementation is on disk > > >>>> > > >>>> 3. > > >>>> The description is in the public interfaces description. I will copy > > it > > >>>> into the proposed changes as well. > > >>>> > > >>>> This is a bit of an implementation detail that I didn't want to add > > >>>> into > > >>>> the kip, but the record will be added to the buffer to keep the > stream > > >>> time > > >>>> consistent, it will just be ejected immediately. If of course if > this > > >>>> causes performance issues we will skip this step and track stream > time > > >>>> separately. I will update the kip to say that stream time advances > > >>>> when a > > >>>> stream record enters the node. > > >>>> > > >>>> Also, yes, updated. > > >>>> > > >>>> 5. > > >>>> No there is no difference right now, everything gets processed as it > > >>> comes > > >>>> in and tries to find a record for its time stamp. > > >>>> > > >>>> Walker > > >>>> > > >>>> On Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna <cado...@apache.org> > > >>>> wrote: > > >>>> > > >>>>> Hi Walker, > > >>>>> > > >>>>> Thanks for the updates! > > >>>>> > > >>>>> 2. > > >>>>> It is still not clear to me how a failure is handled. I do not > > >>>>> understand what you mean by "recover from an OffsetCheckpoint". > > >>>>> > > >>>>> My understanding is that the buffer needs to be replicated into its > > >>>>> own > > >>>>> Kafka topic. The input topic is not enough. The offset of a record > is > > >>>>> added to the offsets to commit once the record is streamed through > > the > > >>>>> subtopology. That means once the record is added to the buffer its > > >>>>> offset is added to the offsets to commit -- independently of > > >>>>> whether the > > >>>>> record was evicted from the buffer and sent to the join node or > not. > > >>>>> Now, let's assume the following scenario > > >>>>> 1. a record is read from the input topic and added to the buffer, > but > > >>>>> not evicted to be processed by the join node. > > >>>>> 2. When the processing of the subtopology finishes the offset of > the > > >>>>> record is added to the offsets to commit. > > >>>>> 3. A commit happens. > > >>>>> 4. A failure happens > > >>>>> > > >>>>> After the failure the buffer is empty but the record will not be > read > > >>>>> anymore from the input topic since its offset has been already > > >>>>> committed. The record is lost. > > >>>>> One solution to avoid the loss is to recreate the buffer from a > > >>>>> compacted Kafka topic as we do for suppression buffers. I do not > > >>>>> think, > > >>>>> we need any offset checkpoint here since, we keep the buffer in > > >>>>> memory, > > >>>>> right? Or do you plan to back the buffer with a persistent store? > > Even > > >>>>> in that case, a compacted Kafka topic would be needed. > > >>>>> > > >>>>> > > >>>>> 3. > > >>>>> From the KIP it is still not clear to me what happens if a > > >>>>> record is > > >>>>> outside of the grace period. I guess the record that falls outside > of > > >>>>> the grace period will not be added to the buffer, but will be send > to > > >>>>> the join node. Since it is outside of the grace period it will also > > >>>>> not > > >>>>> increase stream time and it will not trigger an eviction. Also the > > >>>>> head > > >>>>> of the buffer will not contain a record that needs to be evicted > > since > > >>>>> the the timestamp of the head record will be within the interval > > >>>>> stream > > >>>>> time minus grace period. Is this correct? Please add such a > > >>>>> description > > >>>>> to the KIP. > > >>>>> Furthermore, I think there is a mistake in the text: > > >>>>> "... will dequeue when the record timestamp is greater than stream > > >>>>> time > > >>>>> plus the grace period". I guess that should be "... will dequeue > when > > >>>>> the record timestamp is less than (or equal?) stream time minus the > > >>>>> grace period" > > >>>>> > > >>>>> > > >>>>> 5. > > >>>>> What is the difference between not setting the grace period and > > >>>>> setting > > >>>>> it to zero? If there is a difference, why is there a difference? > > >>>>> > > >>>>> > > >>>>> Best, > > >>>>> Bruno > > >>>>> > > >>>>> > > >>>>> On 01.06.23 23:58, Walker Carlson wrote: > > >>>>>> Hey Bruno thanks for the feedback. > > >>>>>> > > >>>>>> 1) > > >>>>>> I will add this to the kip, but stream time only advances as the > > when > > >>> the > > >>>>>> buffer receives a new record. > > >>>>>> > > >>>>>> 2) > > >>>>>> You are correct, I will add a failure section on to the kip. Since > > >>>>>> the > > >>>>>> records wont change in the buffer from when they are read from the > > >>> topic > > >>>>>> they are replicated already. > > >>>>>> > > >>>>>> 3) > > >>>>>> I see that I'm out voted on the dropping of records thing. We will > > >>>>>> pass > > >>>>>> them on and try to join them if possible. This might cause some > null > > >>>>>> results, but increasing the table history retention should help > > that. > > >>>>>> > > >>>>>> 4) > > >>>>>> I can add some on the kip. But its pretty directly adding whatever > > >>>>>> the > > >>>>>> grace period is to the latency. I don't see a way around it. > > >>>>>> > > >>>>>> Walker > > >>>>>> > > >>>>>> On Thu, Jun 1, 2023 at 5:23 AM Bruno Cadonna <cado...@apache.org> > > >>> wrote: > > >>>>>> > > >>>>>>> Hi Walker, > > >>>>>>> > > >>>>>>> thanks for the KIP! > > >>>>>>> > > >>>>>>> Here my feedback: > > >>>>>>> > > >>>>>>> 1. > > >>>>>>> It is still not clear to me when stream time for the buffer > > >>>>>>> advances. > > >>>>>>> What is the event that let the stream time advance? In the > > >>> discussion, I > > >>>>>>> do not understand what you mean by "The segment store already has > > an > > >>>>>>> observed stream time, we advance based on that. That should only > > >>> advance > > >>>>>>> based on records that enter the store." Where does this segment > > >>>>>>> store > > >>>>>>> come from? Anyways, I think it would be great to also state how > > >>>>>>> stream > > >>>>>>> time advances in the KIP. > > >>>>>>> > > >>>>>>> 2. > > >>>>>>> How does the buffer behave in case of a failure? I think I > > >>>>>>> understand > > >>>>>>> that the buffer will use an implementation of > > >>> TimeOrderedKeyValueBuffer > > >>>>>>> and therefore the records in the buffer will be replicated to a > > >>>>>>> topic > > >>> in > > >>>>>>> Kafka, but I am not completely sure. Could you elaborate on this > in > > >>> the > > >>>>>>> KIP? > > >>>>>>> > > >>>>>>> 3. > > >>>>>>> I agree with Matthias about dropping late records. We use grace > > >>> periods > > >>>>>>> in scenarios where we records are grouped like in windowed > > >>> aggregations > > >>>>>>> and windowed joins. The stream buffer you propose does not really > > >>> group > > >>>>>>> any records. It rather delays records and reorders them. I am not > > >>>>>>> sure > > >>>>>>> if grace period is the right naming/concept to apply here. > > >>>>>>> Instead of > > >>>>>>> dropping records that fall outside of the buffer's time interval > > the > > >>>>>>> join should skip the buffer and try to join the record > > >>>>>>> immediately. In > > >>>>>>> the end, a stream-table join is a unwindowed join, i.e., no > > grouping > > >>> is > > >>>>>>> applied to the records. > > >>>>>>> What do you and other folks think about this proposal? > > >>>>>>> > > >>>>>>> 4. > > >>>>>>> How does the proposed buffer, affects processing latency? Could > you > > >>>>>>> please add some words about this to the KIP? > > >>>>>>> > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Bruno > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On 31.05.23 01:49, Walker Carlson wrote: > > >>>>>>>> Thanks for all the additional comments. I will either address > them > > >>> here > > >>>>>>> or > > >>>>>>>> update the kip accordingly. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> I mentioned a follow kip to add extra features before and in the > > >>>>>>> responses. > > >>>>>>>> I will try to briefly summarize what options and optimizations I > > >>>>>>>> plan > > >>>>> to > > >>>>>>>> include. If a concern is not covered in this list I for sure > talk > > >>> about > > >>>>>>> it > > >>>>>>>> below. > > >>>>>>>> > > >>>>>>>> * Allowing non versioned tables to still use the stream buffer > > >>>>>>>> * Automatically materializing tables instead of forcing the user > > to > > >>> do > > >>>>> it > > >>>>>>>> * Configurable for in memory buffer > > >>>>>>>> * Order the records in offset order or in time order > > >>>>>>>> * Non memory use buffer (offset order, delayed pull from > stream.) > > >>>>>>>> * Time synced between stream and table side (maybe) > > >>>>>>>> * Do not drop late records and process them as they come in > > >>>>>>>> instead. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> First, Victoria. > > >>>>>>>> > > >>>>>>>> 1) (One of your nits covers this, but you are correct it doesn't > > >>>>>>>> make > > >>>>>>>> sense. so I removed that part of the example.) > > >>>>>>>> For those examples with the "bad" join results I said without > > >>> buffering > > >>>>>>> the > > >>>>>>>> stream it would look like that, but that was incomplete. If the > > >>>>>>>> look > > >>> up > > >>>>>>> was > > >>>>>>>> simply looking at the latest version of the table when the > stream > > >>>>> records > > >>>>>>>> came in then the results were possible. If we are using the > > >>>>>>>> point in > > >>>>> time > > >>>>>>>> lookup that versioned tables let us then you are correct the > > future > > >>>>>>> results > > >>>>>>>> are not possible. > > >>>>>>>> > > >>>>>>>> 2) I'll get to this later as Matthias brought up something > > related. > > >>>>>>>> > > >>>>>>>> To your additional thoughts, I agree that we need to call those > > >>> things > > >>>>>>> out > > >>>>>>>> in the documentation. I'm writing up a follow up kip with a lot > of > > >>> the > > >>>>>>>> ideas we have discussed so that we can improve this feature > beyond > > >>> the > > >>>>>>> base > > >>>>>>>> implementation if it's needed. > > >>>>>>>> > > >>>>>>>> I addressed the nits in the kip. I somehow missed the table > stream > > >>>>> table > > >>>>>>>> join processor improvement, it makes your first question make a > > lot > > >>>>> more > > >>>>>>>> sense. Table history retention is a much cleaner way to > > >>>>>>>> describe it. > > >>>>>>>> > > >>>>>>>> As to your mention of the syncing the time for the table and > > >>>>>>>> stream. > > >>>>>>>> Matthias mentioned that as well. I will address both here. I > > >>>>>>>> plan to > > >>>>>>> bring > > >>>>>>>> that up in the future, but for now we will leave it out. I > > >>>>>>>> suppose it > > >>>>>>> will > > >>>>>>>> be more useful after the table history retention is separable > from > > >>> the > > >>>>>>>> table grace period. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> To address Matthias comments. > > >>>>>>>> > > >>>>>>>> You are correct by saying the in memory store shouldn't cause > any > > >>>>>>> semantic > > >>>>>>>> concerns. My concern would be more with if we limited the number > > of > > >>>>>>> records > > >>>>>>>> on the buffer and what we would do if we hit said limits, > > (emitting > > >>>>> those > > >>>>>>>> records might be an issue, throwing an error and halting would > > >>>>>>>> not). > > >>> I > > >>>>>>>> think we can leave this discussion to the follow up kip along > > >>>>>>>> with a > > >>>>> few > > >>>>>>>> other options. > > >>>>>>>> > > >>>>>>>> I will go through your proposals now. > > >>>>>>>> > > >>>>>>>> - don't support non-versioned KTables > > >>>>>>>> > > >>>>>>>> Sure, we can always expand this later on. Will include as part > > >>>>>>>> of the > > >>>>> of > > >>>>>>>> the improvement kip > > >>>>>>>> > > >>>>>>>> - if grace period is added, users need to explicitly > > >>>>>>>> materialize > > >>>>> the > > >>>>>>>> table as version (either directly, or upstream. Upstream only > > works > > >>> if > > >>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914) > > >>>>>>>> > > >>>>>>>> again, that works for me for now, if we find a use we can always > > >>>>>>>> add > > >>>>>>> later. > > >>>>>>>> > > >>>>>>>> - the table's history retention time must be larger than > the > > >>> grace > > >>>>>>>> period (should be easy to check at runtime, when we build the > > >>> topology) > > >>>>>>>> > > >>>>>>>> agreed > > >>>>>>>> > > >>>>>>>> - because switching from non-versioned to version stores > > >>>>>>>> is not > > >>>>>>>> backward compatibly (cf KIP-914), users need to take care of > this > > >>>>>>>> themselves, and this also implies that adding grace period is > not > > a > > >>>>>>>> backward compatible change (even only if via indirect means) > > >>>>>>>> > > >>>>>>>> sure, this works > > >>>>>>>> > > >>>>>>>> As to the dropping of late records, I'm not sure. One one hand I > > >>>>>>>> like > > >>>>> not > > >>>>>>>> dropping things. But on the other I struggle to see how a user > can > > >>>>> filter > > >>>>>>>> out late records that might have incomplete join results. The > > point > > >>> in > > >>>>>>> time > > >>>>>>>> look up will aggressively expire old data and if new data has > been > > >>>>>>> replaced > > >>>>>>>> it will return null if outside of the retention. This seems like > > it > > >>>>> could > > >>>>>>>> corrupt the integrity of the join output. Seeing that we drop > late > > >>>>>>> records > > >>>>>>>> on the table side as well I would think it makes sense to drop > > late > > >>>>>>> records > > >>>>>>>> on the stream buffer. I could be convinced otherwise I suppose, > I > > >>> could > > >>>>>>> see > > >>>>>>>> adding this as an option in a follow up kip. It would be very > > >>>>>>>> easy to > > >>>>>>>> implement either way. For now unless no one else objects I'm > > >>>>>>>> going to > > >>>>>>> stick > > >>>>>>>> with dropping the records for the sake of getting this kip > > >>>>>>>> passed. It > > >>>>> is > > >>>>>>>> functionally a small change to make and we can update later if > you > > >>> feel > > >>>>>>>> strongly about it. > > >>>>>>>> > > >>>>>>>> For the ordering. I have to say that it would be more > > >>>>>>>> complicated to > > >>>>>>>> implement it to be in offset order, if the goal it to get as > > >>>>>>>> many of > > >>>>> the > > >>>>>>>> records validly joined as possible. Because we would process as > > >>> things > > >>>>>>> left > > >>>>>>>> the buffer a sufficiency early enough record could hold up > records > > >>> that > > >>>>>>>> would otherwise be valid past the table history retention. To > fix > > >>> this > > >>>>> we > > >>>>>>>> could process by timestamp then store in a second queue and emit > > by > > >>>>>>> offset, > > >>>>>>>> but that would be a lot more complicated. If we didn't care > > >>>>>>>> about not > > >>>>>>>> missing some valid joins we could just have no store and pull > from > > >>> the > > >>>>>>>> topic at a delay only caring about the timestamp of the next > > >>>>>>>> offset. > > >>>>> For > > >>>>>>>> now I want to stick with the timestamp ordering as it makes much > > >>>>>>>> more > > >>>>>>> sense > > >>>>>>>> to me, but would propose we add both of the other options I have > > >>>>>>>> laid > > >>>>> out > > >>>>>>>> here in the follow up kip. > > >>>>>>>> > > >>>>>>>> Lastly, I think having an empty store with zero grace period > > >>>>>>>> would be > > >>>>>>> super > > >>>>>>>> simple and not costly, so we might as well make it even if > nothing > > >>> gets > > >>>>>>>> entered. > > >>>>>>>> > > >>>>>>>> I hope that address all your concerns, > > >>>>>>>> > > >>>>>>>> Walker > > >>>>>>>> > > >>>>>>>> On Thu, May 25, 2023 at 9:50 AM Matthias J. Sax < > mj...@apache.org > > > > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Walker, > > >>>>>>>>> > > >>>>>>>>> thanks for the updates. The KIP itself reads fine (of course > > >>> Victoria > > >>>>>>>>> made good comments about some phrases), but there is a couple > of > > >>>>> things > > >>>>>>>>> from your latest reply I don't understand, and that I still > think > > >>> need > > >>>>>>>>> some more discussions. > > >>>>>>>>> > > >>>>>>>>> Lukas, asked about in-memory option and `WindowStoreSupplier` > and > > >>> you > > >>>>>>>>> mention "semantic concerns". There should not be any semantic > > >>>>> difference > > >>>>>>>>> from the underlying buffer implementation, so I am not sure > > >>>>>>>>> what you > > >>>>>>>>> mean here (also the relationship to suppress() is unclear to > me)? > > >>> -- I > > >>>>>>>>> am ok to not make it configurable for now. We can always do it > > >>>>>>>>> via a > > >>>>>>>>> follow up KIP, and keep interface changes limited for now. > > >>>>>>>>> > > >>>>>>>>> Does it really make sense to allow a grace period if the table > is > > >>>>>>>>> non-versioned? You also say: "If table is not materialized it > > will > > >>>>>>>>> materialize it as versioned." -- What history retention time > > would > > >>> we > > >>>>>>>>> pick for this case (also asked by Victoria)? Or should we > > >>>>>>>>> rather not > > >>>>>>>>> support this and force the user to materialize the table > > >>>>>>>>> explicitly, > > >>>>> and > > >>>>>>>>> thus explicitly picking a history retention time? It's tradeoff > > >>>>> between > > >>>>>>>>> usability and guiding uses that there will be a significant > > impact > > >>> on > > >>>>>>>>> disk usage. There is also compatibility concerns: If the table > is > > >>> not > > >>>>>>>>> explicitly materialized in the old program, we would already > > >>>>>>>>> need to > > >>>>>>>>> materialize it also in the old program (of course, we would > use a > > >>>>>>>>> non-versioned store so far). Thus, if somebody adds a grace > > >>>>>>>>> period, > > >>> we > > >>>>>>>>> cannot just switch the store type, as it would be a breaking > > >>>>>>>>> change, > > >>>>>>>>> potentially required an application re-set, or following the > > >>>>>>>>> upgrade > > >>>>>>>>> path for versioned state stores, and also changing the program > to > > >>>>>>>>> explicitly materialize using a versioned store. Also note, that > > we > > >>>>> might > > >>>>>>>>> not materialize the actual join table, but only an upstream > > table, > > >>> and > > >>>>>>>>> use `ValueGetter` to access the upstream data. > > >>>>>>>>> > > >>>>>>>>> To this end, as you already mentioned, history retention of the > > >>> table > > >>>>>>>>> should be at least grace period. You proposed to include this > in > > a > > >>>>>>>>> follow up KIP, but I am wondering if it's a fundamental > > >>>>>>>>> requirement > > >>>>> and > > >>>>>>>>> thus we should put a check in place right away and reject an > > >>>>>>>>> invalid > > >>>>>>>>> configuration? (It always easier to lift restriction than to > > >>> introduce > > >>>>>>>>> them later.) This would also imply that a non-versioned table > > >>>>>>>>> cannot > > >>>>> be > > >>>>>>>>> supported, because it does not have a history retention that is > > >>> larger > > >>>>>>>>> than grace period, and maybe also answer the requirement about > > >>>>>>>>> materialization: as we already always materialize something on > > the > > >>>>>>>>> tablet side as non-versioned store right now, it seems > > >>>>>>>>> difficult to > > >>>>>>>>> migrate the store to a versioned store. Ie, it might be ok to > > push > > >>> the > > >>>>>>>>> burden onto the user and say: if you start using grace period, > > you > > >>>>> also > > >>>>>>>>> need to manually switch from non-versioned to versioned > KTables. > > >>> Doing > > >>>>>>>>> stuff automatically under the hood if very complex for this > > >>>>>>>>> case, we > > >>>>> if > > >>>>>>>>> we push the burden onto the user, it might be ok to not > > complicate > > >>>>> this > > >>>>>>>>> KIP significantly. > > >>>>>>>>> > > >>>>>>>>> To summarize the last two paragraphs, I would propose to: > > >>>>>>>>> - don't support non-versioned KTables > > >>>>>>>>> - if grace period is added, users need to explicitly > > >>> materialize > > >>>>> the > > >>>>>>>>> table as version (either directly, or upstream. Upstream only > > >>>>>>>>> works > > >>> if > > >>>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914) > > >>>>>>>>> - the table's history retention time must be larger than > > the > > >>> grace > > >>>>>>>>> period (should be easy to check at runtime, when we build the > > >>>>> topology) > > >>>>>>>>> - because switching from non-versioned to version stores > > >>>>>>>>> is not > > >>>>>>>>> backward compatibly (cf KIP-914), users need to take care of > this > > >>>>>>>>> themselves, and this also implies that adding grace period is > > >>>>>>>>> not a > > >>>>>>>>> backward compatible change (even only if via indirect means) > > >>>>>>>>> > > >>>>>>>>> About dropping late records: wondering if we should never drop > a > > >>>>>>>>> stream-side record for a left-join, even if it's late? In > > general, > > >>> one > > >>>>>>>>> thing I observed over the years is, that it's easier to keep > > stuff > > >>> and > > >>>>>>>>> let users filter explicitly downstream (or make it > configurable), > > >>>>>>>>> instead of dropping pro-actively, because users have no good > > >>>>>>>>> way to > > >>>>>>>>> resurrect record that got already dropped. > > >>>>>>>>> > > >>>>>>>>> For ordering, sounds reasonable to me only start with one > > >>>>>>>>> implementation, and maybe make it configurable as a follow up. > > >>>>> However, > > >>>>>>>>> I am wondering if starting with offset order might be the > better > > >>>>> option > > >>>>>>>>> as it seems to align more with what we do so far? So instead of > > >>>>> storing > > >>>>>>>>> record ordered by timestamp, we can just store them ordered by > > >>> offset, > > >>>>>>>>> and still "poll" from the buffer based on the head records > > >>> timestamp. > > >>>>> Or > > >>>>>>>>> would this complicate the implementation significantly? > > >>>>>>>>> > > >>>>>>>>> I also think it's ok to not "sync" stream-time between the > > >>>>>>>>> table and > > >>>>> the > > >>>>>>>>> stream in this KIP, but we should consider doing this as a > > >>>>>>>>> follow up > > >>>>>>>>> change (not sure if we would need a KIP or not for a change > this > > >>>>> this). > > >>>>>>>>> > > >>>>>>>>> About increasing/decreasing grace period: what you describe > make > > >>> sense > > >>>>>>>>> to me. If decreased, the next record would just trigger > emitting > > a > > >>> lot > > >>>>>>>>> of records, and for increase, the buffer would just need to > "fill > > >>> up" > > >>>>>>>>> again. For reprocessing getting a different result with a > > >>>>>>>>> different > > >>>>>>>>> grace period is expected, so that's ok IMHO. -- There seems to > be > > >>> one > > >>>>>>>>> special corner case: grace period zero. For this case, we > > actually > > >>>>> don't > > >>>>>>>>> need any store, and the stream-side could be stateless. I think > > it > > >>> can > > >>>>>>>>> have the same behavior, but if we want to "add / remove" the > > store > > >>>>>>>>> dynamically, we need to add specific code for it. For example, > > >>>>>>>>> even > > >>> if > > >>>>>>>>> we start up with a grace period of zero, we would need to check > > if > > >>>>> there > > >>>>>>>>> is a local store, and still emit everything in it, before we > can > > >>> ditch > > >>>>>>>>> the store (not sure if that's even easily done at all). Or: we > > >>>>>>>>> would > > >>>>>>>>> need to have a store for _all_ cases, even if grace period is > > zero > > >>>>> (the > > >>>>>>>>> store would be empty all the time though), to avoid super > complex > > >>>>> code? > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -Matthias > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On 5/25/23 10:53 AM, Lucas Brutschy wrote: > > >>>>>>>>>> Hi Walker, > > >>>>>>>>>> > > >>>>>>>>>> thanks for your responses. That makes sense. I guess there is > > >>> always > > >>>>>>>>>> the option to make the implementation more configurable later > > on, > > >>> if > > >>>>>>>>>> users request it. Also thanks for the clarifications. From my > > >>>>>>>>>> side, > > >>>>>>>>>> the KIP is good to go. > > >>>>>>>>>> > > >>>>>>>>>> Cheers, > > >>>>>>>>>> Lucas > > >>>>>>>>>> > > >>>>>>>>>> On Wed, May 24, 2023 at 11:54 PM Victoria Xia > > >>>>>>>>>> <victoria....@confluent.io.invalid> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks for the updates, Walker! Looks great, though I do > have a > > >>>>> couple > > >>>>>>>>>>> questions about the latest updates: > > >>>>>>>>>>> > > >>>>>>>>>>> 1. The new example says that without stream-side > > >>>>>>>>>>> buffering, > > >>>>> "ex" > > >>>>>>> and > > >>>>>>>>>>> "fy" are possible join results. How could those join > > >>> results > > >>>>>>>>> happen? The > > >>>>>>>>>>> example versioned table suggests that table record > > >>>>>>>>>>> "x" has > > >>>>>>>>> timestamp 2, and > > >>>>>>>>>>> table record "y" has timestamp 3. If stream record > > >>>>>>>>>>> "e" has > > >>>>>>>>> timestamp 1, > > >>>>>>>>>>> then it can never be joined against record "x", and > > >>> similarly > > >>>>> for > > >>>>>>>>> stream > > >>>>>>>>>>> record "f" with timestamp 2 being joined against "y". > > >>>>>>>>>>> 2. I see in your replies above that "If table is not > > >>>>>>> materialized it > > >>>>>>>>>>> will materialize it as versioned" but I don't see > this > > >>> called > > >>>>> out > > >>>>>>>>> in the > > >>>>>>>>>>> KIP -- seems worth calling out. Also, what will the > > >>>>>>>>>>> history > > >>>>>>>>> retention for > > >>>>>>>>>>> the versioned table be? Will it be the same as the > join > > >>> grace > > >>>>>>>>> period, or > > >>>>>>>>>>> will it be greater? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> And some additional thoughts: > > >>>>>>>>>>> > > >>>>>>>>>>> Sounds like there are a few things users should watch out for > > >>>>>>>>>>> when > > >>>>>>>>> enabling > > >>>>>>>>>>> the stream-side buffer: > > >>>>>>>>>>> > > >>>>>>>>>>> - Records will get "stuck" if there are no newer > > >>>>>>>>>>> records to > > >>>>>>> advance > > >>>>>>>>>>> stream time. > > >>>>>>>>>>> - If there are large gaps between the timestamps of > > >>>>> stream-side > > >>>>>>>>> records, > > >>>>>>>>>>> then it's possible that versioned store history > > >>>>>>>>>>> retention > > >>> will > > >>>>>>> have > > >>>>>>>>> expired > > >>>>>>>>>>> by the time a record is evicted from the join buffer, > > >>> leading > > >>>>> to > > >>>>>>> a > > >>>>>>>>> join > > >>>>>>>>>>> "miss." For example, if the join grace period and > table > > >>>>> history > > >>>>>>>>> retention > > >>>>>>>>>>> are both 10, and records come in the order: > > >>>>>>>>>>> > > >>>>>>>>>>> table side t0 with ts=0 > > >>>>>>>>>>> stream side s1 with ts=1 <-- enters buffer > > >>>>>>>>>>> table side t10 with ts=10 > > >>>>>>>>>>> table side t20 with ts=20 > > >>>>>>>>>>> stream side s21 with ts=21 <-- evicts record s1 from > > >>> buffer, > > >>>>> but > > >>>>>>>>>>> versioned store no longer contains data for ts=1 due > to > > >>>>> history > > >>>>>>>>> retention > > >>>>>>>>>>> having elapsed > > >>>>>>>>>>> > > >>>>>>>>>>> This will result in the join result (s1, null) even > > >>>>>>>>>>> though > > >>> it > > >>>>>>>>> should've > > >>>>>>>>>>> been (s1, t0), due to t0 having been expired from the > > >>>>> versioned > > >>>>>>>>> store > > >>>>>>>>>>> already. > > >>>>>>>>>>> - Out-of-order records from the stream-side will be > > >>> reordered, > > >>>>>>> and > > >>>>>>>>> late > > >>>>>>>>>>> records will be dropped. > > >>>>>>>>>>> > > >>>>>>>>>>> I don't think any of these are reasons to not go forward with > > >>>>>>>>>>> this > > >>>>>>> KIP, > > >>>>>>>>> but > > >>>>>>>>>>> it'd be good to call them out in the eventual documentation > to > > >>>>>>> decrease > > >>>>>>>>> the > > >>>>>>>>>>> chance users get tripped up. > > >>>>>>>>>>> > > >>>>>>>>>>>> We could maybe do an improvement later to advance stream > time > > >>> from > > >>>>>>>>> table > > >>>>>>>>>>> side as well, but that might be debatable as we might get > more > > >>> late > > >>>>>>>>> records. > > >>>>>>>>>>> > > >>>>>>>>>>> Yes, the likelihood of late records increases but also the > > >>>>> likelihood > > >>>>>>> of > > >>>>>>>>>>> "join misses" due to versioned store history retention having > > >>>>> elapsed > > >>>>>>>>>>> decreases, which feels important for certain use cases. > Either > > >>> way, > > >>>>>>>>> agreed > > >>>>>>>>>>> that it can be a discussion for the future as incorporating > > this > > >>>>> would > > >>>>>>>>>>> substantially complicate the implementation. > > >>>>>>>>>>> > > >>>>>>>>>>> Also a couple nits: > > >>>>>>>>>>> > > >>>>>>>>>>> - The KIP currently says "We recently added versioned > > >>> tables > > >>>>>>> which > > >>>>>>>>> allow > > >>>>>>>>>>> the table side of the a join [...] but it is not > taken > > >>>>> advantage > > >>>>>>> of > > >>>>>>>>> in > > >>>>>>>>>>> joins," but this doesn't seem true? If the table of a > > >>>>>>> stream-table > > >>>>>>>>> join is > > >>>>>>>>>>> versioned, then the DSL's stream-table join processor > > >>>>>>>>>>> will > > >>>>>>>>> automatically > > >>>>>>>>>>> perform timestamped lookups into the table, in order > to > > >>> take > > >>>>>>>>> advantage of > > >>>>>>>>>>> the new timestamp-aware store to provide better join > > >>>>> semantics. > > >>>>>>>>>>> - The KIP mentions "grace period" for versioned > > >>>>>>>>>>> stores in a > > >>>>>>> number > > >>>>>>>>> of > > >>>>>>>>>>> places but I think you actually mean "history > > >>>>>>>>>>> retention"? > > >>> The > > >>>>> two > > >>>>>>>>> happen to > > >>>>>>>>>>> be the same today (it is not an option for users to > > >>> configure > > >>>>> the > > >>>>>>>>> two > > >>>>>>>>>>> separately) but this need not be true in the future. > > >>> "History > > >>>>>>>>> retention" > > >>>>>>>>>>> governs how far back in time reads may occur, which > > >>>>>>>>>>> is the > > >>>>>>> relevant > > >>>>>>>>>>> parameter for performing lookups as part of the > > >>> stream-table > > >>>>>>> join. > > >>>>>>>>> "Grace > > >>>>>>>>>>> period" in the context of versioned stores refers to > > how > > >>> far > > >>>>> back > > >>>>>>>>> in time > > >>>>>>>>>>> out-of-order writes may occur, which probably isn't > > >>> directly > > >>>>>>>>> relevant for > > >>>>>>>>>>> introducing a stream-side buffer, though it's also > > >>>>>>>>>>> possible > > >>>>> I've > > >>>>>>>>> overlooked > > >>>>>>>>>>> something. (As a bonus, switching from "table grace > > >>> period" in > > >>>>>>> the > > >>>>>>>>> KIP to > > >>>>>>>>>>> "table history retention" also helps to > > >>>>>>>>>>> clarify/distinguish > > >>>>> that > > >>>>>>>>> it's a > > >>>>>>>>>>> different parameter from the "join grace period," > > >>>>>>>>>>> which I > > >>>>> could > > >>>>>>> see > > >>>>>>>>> being > > >>>>>>>>>>> confusing to readers. :) ) > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> Victoria > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, May 18, 2023 at 1:43 PM Walker Carlson > > >>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hey all, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks for the comments, they gave me a lot to think about. > > >>>>>>>>>>>> I'll > > >>>>> try > > >>>>>>> to > > >>>>>>>>>>>> address them all inorder. I have made some updates to the > kip > > >>>>> related > > >>>>>>>>> to > > >>>>>>>>>>>> them, but I mention where below. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Lucas > > >>>>>>>>>>>> > > >>>>>>>>>>>> Good idea about the example. I added a simple one. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1) I have thought about including options for the underlying > > >>> buffer > > >>>>>>>>>>>> configuration. One of which might be adding an in memory > > >>>>>>>>>>>> option. > > >>> My > > >>>>>>>>> biggest > > >>>>>>>>>>>> concern is about the semantic guarantees. This isn't like > > >>> suppress > > >>>>> or > > >>>>>>>>> with > > >>>>>>>>>>>> windows where producing incomplete results is repetitively > > >>>>> harmless. > > >>>>>>>>> Here > > >>>>>>>>>>>> we would be possibly producing incorrect results. I also > would > > >>> like > > >>>>>>> to > > >>>>>>>>> keep > > >>>>>>>>>>>> the interface changes as simple as I can. Making more than > > this > > >>>>>>> change > > >>>>>>>>> to > > >>>>>>>>>>>> Joined I feel could make this more complicated than it needs > > to > > >>> be. > > >>>>>>> If > > >>>>>>>>> we > > >>>>>>>>>>>> really want to I could see adding a grace() option with a > > >>>>>>> BufferConifg > > >>>>>>>>> in > > >>>>>>>>>>>> there or something, but I would rather not. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 2) The buffer will be independent of if the table is > > >>>>>>>>>>>> versioned or > > >>>>>>> not. > > >>>>>>>>> If > > >>>>>>>>>>>> table is not materialized it will materialize it as > > >>>>>>>>>>>> versioned. It > > >>>>>>> might > > >>>>>>>>>>>> make sense to do a follow up kip where we force the > retention > > >>>>> period > > >>>>>>>>> of > > >>>>>>>>>>>> the versioned to be greater than whatever the max of the > > stream > > >>>>>>> buffer > > >>>>>>>>> is. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Victoria > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1) Yes, records will exit in timestamp order not in offset > > >>>>>>>>>>>> order. > > >>>>>>>>>>>> 2) Late records will be dropped (Late as out of the grace > > >>> period). > > >>>>>>>>> From my > > >>>>>>>>>>>> understanding that is the point of a grace period, no? > Doesn't > > >>> the > > >>>>>>> same > > >>>>>>>>>>>> thing happen with versioned stores? > > >>>>>>>>>>>> 3) The segment store already has an observed stream time, we > > >>>>> advance > > >>>>>>>>> based > > >>>>>>>>>>>> on that. That should only advance based on records that > > >>>>>>>>>>>> enter the > > >>>>>>>>> store. So > > >>>>>>>>>>>> yes, only stream side records. We could maybe do an > > improvement > > >>>>> later > > >>>>>>>>> to > > >>>>>>>>>>>> advance stream time from table side as well, but that might > be > > >>>>>>>>> debatable as > > >>>>>>>>>>>> we might get more late records. Anyways I would rather have > > >>>>>>>>>>>> that > > >>>>> as a > > >>>>>>>>>>>> separate discussion. > > >>>>>>>>>>>> > > >>>>>>>>>>>> in memory option? We can do that, for the buffer I plan to > use > > >>> the > > >>>>>>>>>>>> TimeOrderedKeyValueBuffer interface which already has an in > > >>> memory > > >>>>>>>>>>>> implantation, so it would be simple. > > >>>>>>>>>>>> > > >>>>>>>>>>>> I said more in my answer to Lucas's question. The concern I > > >>>>>>>>>>>> have > > >>>>> with > > >>>>>>>>>>>> buffer configs or in memory is complicating the interface. > > Also > > >>>>>>>>> semantic > > >>>>>>>>>>>> guarantees but in memory shouldn't effect that > > >>>>>>>>>>>> > > >>>>>>>>>>>> Matthias > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1) fixed out of order vs late terminology in the kip. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 2) I was referring to having a stream. So after this kip we > > can > > >>>>> have > > >>>>>>> a > > >>>>>>>>>>>> buffered stream or a normal one. For the table we can use a > > >>>>> versioned > > >>>>>>>>> table > > >>>>>>>>>>>> or a normal table. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 3 Good call out. I clarified this as "If the table side > uses a > > >>>>>>>>> materialized > > >>>>>>>>>>>> version store, it can store multiple versions of each record > > >>> within > > >>>>>>> its > > >>>>>>>>>>>> defined grace period." and modified the rest of the > paragraph > > a > > >>>>> bit. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 4) I get the preserving off offset ordering, but if the > > >>>>>>>>>>>> stream is > > >>>>>>>>> buffered > > >>>>>>>>>>>> to join on timestamp instead of offset doesn't it already > seem > > >>> like > > >>>>>>> we > > >>>>>>>>> care > > >>>>>>>>>>>> more about time in this case? > > >>>>>>>>>>>> > > >>>>>>>>>>>> If we end up adding more options it might make sense to do > > >>>>>>>>>>>> this. > > >>>>>>> Maybe > > >>>>>>>>>>>> offset order processing can be a follow up? > > >>>>>>>>>>>> > > >>>>>>>>>>>> I'll add a section for this in Rejected Alternatives. I > > >>>>>>>>>>>> think it > > >>>>>>> makes > > >>>>>>>>>>>> sense to do something like this but maybe in a follow up. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 5) I hadn't thought about this. I suppose if they changed > > >>>>>>>>>>>> this in > > >>>>> an > > >>>>>>>>>>>> upgrade the next record would either evict a lot of records > > (if > > >>> the > > >>>>>>>>> grace > > >>>>>>>>>>>> period decreased) or there would be a pause until the new > > grace > > >>>>>>> period > > >>>>>>>>>>>> reached. Increasing is a bit more problematic, especially if > > >>>>>>>>>>>> the > > >>>>>>> table > > >>>>>>>>>>>> grace period and retention time stays the same. If the data > is > > >>>>>>>>> reprocessed > > >>>>>>>>>>>> after a change like that then there would be different > > results, > > >>>>> but I > > >>>>>>>>> feel > > >>>>>>>>>>>> like that would be expected after such a change. > > >>>>>>>>>>>> > > >>>>>>>>>>>> What do you think should happen? > > >>>>>>>>>>>> > > >>>>>>>>>>>> Hopefully this answers your questions! > > >>>>>>>>>>>> > > >>>>>>>>>>>> Walker > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax < > > >>> mj...@apache.org> > > >>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for the KIP! Also some question/comments from my > side: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 10) Notation: you use the term "late data" but I think you > > >>>>>>>>>>>>> mean > > >>>>>>>>>>>>> out-of-order. We reserve the term "late" to records that > > >>>>>>>>>>>>> arrive > > >>>>>>> after > > >>>>>>>>>>>>> grace period passed, and thus, "late == out-of-order data > > that > > >>> is > > >>>>>>>>>>>> dropped". > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 20) "There is only one option from the stream side and only > > >>>>> recently > > >>>>>>>>> is > > >>>>>>>>>>>>> there a second option on the table side." > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> What are those options? Victoria already asked about the > > table > > >>>>> side, > > >>>>>>>>> but > > >>>>>>>>>>>>> I am also not sure what option you mean for the stream > side? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 30) "If the table side uses a materialized version store > the > > >>> value > > >>>>>>> is > > >>>>>>>>>>>>> the latest by stream time rather than by offset within its > > >>> defined > > >>>>>>>>> grace > > >>>>>>>>>>>>> period." > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The phrase "the value is the latest by stream time" is > > >>>>>>>>>>>>> confusing > > >>>>> -- > > >>>>>>> in > > >>>>>>>>>>>>> the end, a versioned stores multiple versions, not just > one. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 40) I am also wondering about ordering. In general, KS > > >>>>>>>>>>>>> tries to > > >>>>>>>>> preserve > > >>>>>>>>>>>>> offset-order during processing (with some exception, when > > >>>>>>>>>>>>> offset > > >>>>>>> order > > >>>>>>>>>>>>> preservation is not clearly defined). Given that the > > >>>>>>>>>>>>> stream-side > > >>>>>>>>> buffer > > >>>>>>>>>>>>> is really just a "linear buffer", we could easily preserve > > >>>>>>>>> offset-order. > > >>>>>>>>>>>>> But I also see a benefit of re-ordering and emitting > > >>> out-of-order > > >>>>>>> data > > >>>>>>>>>>>>> right away when read (instead of blocking them behind > > in-order > > >>>>>>> records > > >>>>>>>>>>>>> that are not ready yet). -- It might even be a possibility, > > to > > >>> let > > >>>>>>>>> users > > >>>>>>>>>>>>> pick a emit strategy eg "EmitStrategy.preserveOffsets" > (name > > >>> just > > >>>>> a > > >>>>>>>>>>>>> placeholder). > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The KIP should explain this in more detail and also discuss > > >>>>>>> different > > >>>>>>>>>>>>> options and mention them in "Rejected alternatives" in case > > we > > >>>>> don't > > >>>>>>>>>>>>> want to include them. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 50) What happens when users change the grace period? > > >>>>>>>>>>>>> Especially, > > >>>>>>> when > > >>>>>>>>>>>>> they turn it on/off (but also increasing/decreasing is an > > >>>>>>> interesting > > >>>>>>>>>>>>> point)? I think we should try to support this if possible; > > the > > >>>>>>>>>>>>> "Compatibility" section needs to cover switching on/off in > > >>>>>>>>>>>>> more > > >>>>>>>>> detail. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 5/2/23 2:06 PM, Victoria Xia wrote: > > >>>>>>>>>>>>>> Cool KIP, Walker! Thanks for sharing this proposal. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> A few clarifications: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1. Is the order that records exit the buffer in > > >>>>>>>>>>>>>> necessarily the > > >>>>>>> same > > >>>>>>>>> as > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>> order that records enter the buffer in, or no? Based on > the > > >>>>>>>>> description > > >>>>>>>>>>>>> in > > >>>>>>>>>>>>>> the KIP, it sounds like the answer is no, i.e., records > will > > >>> exit > > >>>>>>> the > > >>>>>>>>>>>>>> buffer in increasing timestamp order, which means that > > >>>>>>>>>>>>>> they may > > >>>>> be > > >>>>>>>>>>>>> ordered > > >>>>>>>>>>>>>> (even for the same key) compared to the input order. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 2. What happens if the join grace period is nonzero, and a > > >>>>>>>>> stream-side > > >>>>>>>>>>>>>> record arrives with a timestamp that is older than the > > >>>>>>>>>>>>>> current > > >>>>>>> stream > > >>>>>>>>>>>>> time > > >>>>>>>>>>>>>> minus the grace period? Will this record trigger a join > > >>>>>>>>>>>>>> result, > > >>>>> or > > >>>>>>>>> will > > >>>>>>>>>>>>> it > > >>>>>>>>>>>>>> be dropped? Based on the description for what happens when > > >>>>>>>>>>>>>> the > > >>>>> join > > >>>>>>>>>>>> grace > > >>>>>>>>>>>>>> period is set to zero, it sounds like the late record will > > be > > >>>>>>>>> dropped, > > >>>>>>>>>>>>> even > > >>>>>>>>>>>>>> if the join grace period is nonzero. Is that true? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 3. What could cause stream time to advance, for purposes > of > > >>>>>>> removing > > >>>>>>>>>>>>>> records from the join buffer? For example, will new > records > > >>>>>>> arriving > > >>>>>>>>> on > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>> table side of the join cause stream time to advance? From > > the > > >>> KIP > > >>>>>>> it > > >>>>>>>>>>>>> sounds > > >>>>>>>>>>>>>> like only stream-side records will advance stream time -- > > >>>>>>>>>>>>>> does > > >>>>> that > > >>>>>>>>>>>> mean > > >>>>>>>>>>>>>> that the join processor itself will have to track this > > stream > > >>>>> time? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Also +1 to Lucas's question about what options will be > > >>> available > > >>>>>>> for > > >>>>>>>>>>>>>> configuring the join buffer. Will users have the option to > > >>> choose > > >>>>>>>>>>>> whether > > >>>>>>>>>>>>>> they want the buffer to be in-memory vs persistent? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> - Victoria > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy > > >>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> HI Walker, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> thanks for the KIP! We definitely need this. I have two > > >>>>> questions: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - Have you considered allowing the customization > > >>>>>>>>>>>>>>> of the > > >>>>>>>>> underlying > > >>>>>>>>>>>>>>> buffer implementation? As I can see, `StreamJoined` lets > > you > > >>>>>>>>> customize > > >>>>>>>>>>>>>>> the underlying store via a `WindowStoreSupplier`. Would > it > > >>> make > > >>>>>>>>> sense > > >>>>>>>>>>>>>>> for `Joined` to have this as well? I can imagine one may > > >>>>>>>>>>>>>>> want > > >>> to > > >>>>>>>>> limit > > >>>>>>>>>>>>>>> the number of records in the buffer, for example. If we > hit > > >>> the > > >>>>>>>>>>>>>>> maximum, the only option would be to drop semantic > > >>>>>>>>>>>>>>> guarantees, > > >>>>> but > > >>>>>>>>>>>>>>> users may still want to do this. > > >>>>>>>>>>>>>>> - With "second option on the table side" you are > > >>> referring > > >>>>> to > > >>>>>>>>>>>>>>> versioned tables, right? Will the buffer on the stream > side > > >>>>> behave > > >>>>>>>>> any > > >>>>>>>>>>>>>>> different whether the table side is versioned or not? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Finally, I think a simple example in the motivation > section > > >>>>> could > > >>>>>>>>> help > > >>>>>>>>>>>>>>> non-experts understand the KIP. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>> Lucas > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson > > >>>>>>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hello everybody, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I have a stream proposal to improve the stream table > > >>>>>>>>>>>>>>>> join by > > >>>>>>>>> adding a > > >>>>>>>>>>>>>>> grace > > >>>>>>>>>>>>>>>> period and buffer to the stream side of the join to > allow > > >>>>>>>>> processing > > >>>>>>>>>>>> in > > >>>>>>>>>>>>>>>> timestamp order matching the recent improvements of the > > >>>>> versioned > > >>>>>>>>>>>>> tables. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Please take a look here < > > >>>>>>>>>>>> https://cwiki.apache.org/confluence/x/lAs0Dw> > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>> share your thoughts. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> best, > > >>>>>>>>>>>>>>>> Walker > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > >