Thanks for your comments, Matthias! > For stream-table joins, I think we need to elaborate that a `get(k, ts)` call now might return `null` if the history retention of the store is too short.
Great callout -- I agree we should definitely clarify this in the KIP and mention it in the eventual docs as well. When a call to `get(k, ts)` returns null, there's not really a good way to distinguish whether it's because the timestamp is outside of the store's history retention or if it's because there's actually no record version for the key at the specified timestamp. Determining this from the processor would require (1) exposing the store's history retention to the processor, and (2) reconciling the fact that state stores today (including the new versioned store implementation) track their own observed stream time separate from processor time. In light of this, I think your proposal to treat a null from `get(k, ts)` due to history retention having been exceeded the same as we'd treat any other null makes sense, and is also our only viable option right now. I'll call this out in the docs so users are aware that their choice of history retention has this implication. > For left-table-table joins, there seems to be no special impact, but it should be called out, too. The lookup itself does not go into the history of the table so no change here (as we don't have the "query older than history case") Yup, we're on the same page. Using a versioned store for table-table joins results in the semantic change that the join result will include the latest-by-timestamp record rather than the latest-by-offset record, but no timestamped lookups (i.e., `get(key, ts)` calls) are used in the process so there is no concern about history retention having elapsed and affecting join results. (The only implication of history retention for this use case is indirect, since history retention doubles as grace period for the store. Because grace period is per store instance, which has task-level granularity, that means if grace period is set too low then the latest record for one key could be dropped from the store if another key has already advanced the store's observed stream time past the grace period by the time that this record is seen.) I will update the KIP with these additional notes. Thanks, Victoria On Wed, Mar 15, 2023 at 7:16 PM Matthias J. Sax <mj...@apache.org> wrote: > Thanks for the KIP! Great to see a first step towards using the new > versioned stores! > > I think the described tradeoffs make sense and I like make a pragmatic > step into the right direction, and avoid boiling the ocean. Thus, I > agree to the proposed solution. > > One minor thing, that I believe just need clarification in the KIP (does > not seem to be a change to the KIP itself): > > For stream-table joins, I think we need to elaborate that a `get(k, ts)` > call now might return `null` if the history retention of the store is > too short. For inner-joins it would result in no output record (ie, > stream input record is dropped). Would be good to have it mentioned in > the KIP explicitly. > > We should also discuss how left-joins should work for this case. I think > it's ok (better) to include the stream record in the result if the > lookup returns `null` -- either because no key exist in the exiting > history for the provided timestamp, or (the actual case in question) > because we query older than available history. If you agree, can we add > this to the KIP? > > For left-table-table joins, there seems to be no special impact, but it > should be called out, too. The lookup itself does not go into the > history of the table so no change here (as we don't have the "query > older than history case") -- and for out-of-order records, we just > "drop" them anyway, so no change for left-joins either I believe. > > > -Matthias > > > > On 3/15/23 2:00 PM, Guozhang Wang wrote: > > Sounds good to me. Thanks! > > > > On Wed, Mar 15, 2023 at 12:07 PM Victoria Xia > > <victoria....@confluent.io.invalid> wrote: > >> > >> Thanks for kicking off the discussion, John and Guozhang! > >> > >>> Just one thing that might be out of scope: if users want to enable the > >> versioned table feature across the topology, should we allow them to do > it > >> via a single config rather than changing the materialized object at each > >> place? > >> > >> Yes, I think this would be a great usability improvement and am in > favor of > >> introducing such a config. As long as the config defaults to using > >> unversioned stores (which makes sense anyway), there will be no > >> compatibility concerns with introducing the config in a future release. > >> It's out of scope for this particular KIP as a result, but can > hopefully be > >> introduced as part of the next release after 3.5. > >> > >> Best, > >> Victoria > >> > >> On Wed, Mar 15, 2023 at 10:49 AM Guozhang Wang < > guozhang.wang...@gmail.com> > >> wrote: > >> > >>> Thanks Victoria for the great writeup, with a thorough analysis and > >>> trade-offs. I do not have any major questions about the proposal. > >>> > >>> Just one thing that might be out of scope: if users want to enable the > >>> versioned table feature across the topology, should we allow them to > >>> do it via a single config rather than changing the materialized object > >>> at each place? Maybe we can defer that for future discussions, but > >>> just want to hear your thoughts. > >>> > >>> Anyways, I think this proposal is great just as-is even if we agree to > >>> do the configuration improvement later. > >>> > >>> > >>> Guozhang > >>> > >>> On Thu, Mar 9, 2023 at 7:52 PM John Roesler <vvcep...@apache.org> > wrote: > >>>> > >>>> Thanks for the KIP, Victoria! > >>>> > >>>> I had some questions/concerns, but you addressed them in the Rejected > >>> Alternatives section. Thanks for the thorough proposal! > >>>> > >>>> -John > >>>> > >>>> On Thu, Mar 9, 2023, at 18:59, Victoria Xia wrote: > >>>>> Hi everyone, > >>>>> > >>>>> I have a proposal for updating Kafka Streams's stream-table join and > >>>>> table-table join semantics for the new versioned key-value state > stores > >>>>> introduced in KIP-889 > >>>>> < > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores > >>>> . > >>>>> Would love to hear your thoughts and suggestions. > >>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores > >>>>> > >>>>> Thanks, > >>>>> Victoria > >>> >