Thanks again for the great discussion, Sagar, Bruno, and Matthias. I've
just sent a message to start the vote on this KIP. Please have a look when
you get the chance.

Thanks,
Victoria

On Wed, Dec 14, 2022 at 12:28 PM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for clarifying about the null-question. SGTM.
>
> On 12/13/22 3:06 PM, Victoria Xia wrote:
> > Hi Matthias,
> >
> > Thanks for chiming in! Barring objections from anyone on this thread, I
> > will start the vote for this KIP on Thursday. That should be enough time
> to
> > incorporate any lingering minor changes.
> >
> >> I slightly prefer to add `VersionedRecord` interface (also
> > like the name). I agree that it's low overhead and providing a clean
> > path forward for future changes seems worth it to me.
> >
> > OK, that makes two of us. I updated the KIP just now to formally include
> > VersionedRecord as the new return type from the various
> > VersionedKeyValueStore methods.
> >
> >> if we introduce `VersionedRecord`, I think we can keep the not-null
> > requirement for `ValueAndTimestamp`
> >
> > Not quite. VersionedRecord is only used as a return type from read
> methods,
> > which is why VersionedRecord is able to enforce that its value is never
> > null. If the value being returned would have been null, then we return a
> > null VersionedRecord instead, rather than non-null VersionedRecord with
> > null value. So, there's no use case for a VersionedRecord with null
> value.
> >
> > In contrast, even though ValueAndTimestamp is not anywhere in the public
> > VersionedKeyValueStore interface, ValueAndTimestamp still needs to be
> used
> > internally when representing a versioned key-value store as a
> > TimestampedKeyValueStore, since TimestampedKeyValueStore is used
> everywhere
> > throughout the internals of the codebase. In order to represent a
> versioned
> > key-value store as a TimestampedKeyValueStore, we have to support `put(K
> > key, ValueAndTimestamp<V> value)`, which means ValueAndTimestamp needs to
> > support null value (with timestamp). Otherwise we cannot put a tombstone
> > into a versioned key-value store when using the internal
> > TimestampedKeyValueStore representation.
> >
> > It's very much an implementation detail that ValueAndTimestamp needs to
> be
> > relaxed to allow null values. I think this is a minor enough change that
> is
> > still preferable to the alternatives (refactoring the processors to not
> > require TimestampedKeyValueStore, or introducing a separate workaround
> > `put()` method on the TimestampedKeyValueStore representation of
> versioned
> > key-value stores), so I have left it in as part of the KIP.
> >
> > Best,
> > Victoria
> >
> > On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Thanks Victoria.
> >>
> >> I did not re-read the KIP in full on the wiki but only your email.
> >>
> >> Points (1)-(8) SGTM.
> >>
> >> About (9): I slightly prefer to add `VersionedRecord` interface (also
> >> like the name). I agree that it's low overhead and providing a clean
> >> path forward for future changes seems worth it to me. Btw: if we
> >> introduce `VersionedRecord`, I think we can keep the not-null
> >> requirement for `ValueAndTimestamp` what seems a small side benefit.
> >> (Btw: your code snippet in the KIP shows what `VersionedRecord` would
> >> have a non-null requirement for the value, but I think it would need to
> >> allow null as value?)
> >>
> >>
> >> -Matthias
> >>
> >> On 12/7/22 5:23 PM, Victoria Xia wrote:
> >>> Thanks for the discussion, Bruno, Sagar, and Matthias!
> >>>
> >>> It seems we've reached consensus on almost all of the discussion
> points.
> >>> I've updated the KIP with the following:
> >>> 1) renamed "timestampTo" in `get(key, timestampTo)` to "asOfTimestamp"
> to
> >>> clarify that this timestamp bound is inclusive, per the SQL guideline
> >> that
> >>> "AS OF <timestamp>" queries are inclusive. In the future, if we want to
> >>> introduce a timestamp range query, we can use `get(key, timestampFrom,
> >>> timestampTo)` and specify that timestampTo is exclusive in this method,
> >>> while avoiding confusing with the inclusive asOfTimestamp parameter in
> >> the
> >>> other method, given that the names are different.
> >>> 2) added a description of "history retention" semantics into the
> >>> VersionedKeyValueStore interface Javadoc, and updated the Javadoc for
> >>> `get(key, asOfTimestamp)` to mention explicitly that a null result is
> >>> returned if the provided timestamp bound is not within history
> retention.
> >>> 3) added a `delete(key, timestamp)` method (with return type
> >>> `ValueAndTimestamp<V>`) to the VersionedKeyValueStore interface.
> >>> 4) updated the Javadoc for `segmentInterval` to clarify that the only
> >>> reason a user might be interested in this parameter is performance.
> >>>
> >>> Other points we discussed which did not result in updates include:
> >>> 5) whether to automatically update the `min.compaction.lag.ms` config
> on
> >>> changelog topics when history retention is changed -- there's support
> for
> >>> this but let's not bundle it with this KIP. We can have a separate KIP
> to
> >>> change this behavior for the existing windowed changelog topics, in
> >>> addition to versioned changelog topics.
> >>> 6) should we expose segmentInterval in this KIP -- let's go ahead and
> >>> expose it now since we'll almost certainly expose it (in this same
> >> manner)
> >>> in a follow-on KIP anyway, and so that poor performance for user
> >> workloads
> >>> is less likely to be a barrier for users getting started with this
> >> feature.
> >>> I updated the Javadoc for this parameter to clarify why the Javadoc
> >>> mentions performance despite Javadocs typically not doing so.
> >>> 7) `get(timestampFrom, timestampTo)` and other methods for IQ -- very
> >>> important but deferred to a future KIP
> >>> 8) `purge(key)`/`deleteAllVersions(key)` -- deferred to a future KIP
> >>>
> >>> That leaves only one unresolved discussion point:
> >>> 9) whether to include validTo in the return types from `get(...)`. If
> we
> >> go
> >>> with the current proposal of not including validTo in the return type,
> >> then
> >>> it will not be easy to add it in the future (unless we want to add
> >> validTo
> >>> to ValueAndTimestamp, which feels odd to me). If we think we might want
> >> to
> >>> have validTo in the future, we can change the return type of `get(...)`
> >> and
> >>> `delete(...)` in this proposal from `ValueAndTimestamp<V>` to a new
> type,
> >>> e.g., `VersionedRecord<V>` or `RecordVersion<V>`, which today will look
> >> the
> >>> same as `ValueAndTimestamp<V>` but in the future we can add validTo if
> we
> >>> want. The cost is a new type which today looks the same as
> >>> ValueAndTimestamp.
> >>>
> >>> Now that I think about it more, the cost to introducing a new type
> seems
> >>> relatively low. I've added a proposal towards the bottom of the KIP
> here
> >>> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores#KIP889:VersionedStateStores-Additionalreturntimestampsfromget(key,asOfTimestamp)
> >>> .
> >>> If others also believe that the cost of introducing this new interface
> is
> >>> low (particularly relative to the flexibility it provides us for being
> >> able
> >>> to evolve the class in the future), I will incorporate this proposal
> into
> >>> the KIP. I think the hardest part of this will be deciding on a name
> for
> >>> the new class :)
> >>>
> >>> Pending objections, I'd like to make a call on item (9) and call a vote
> >> on
> >>> this KIP at the end of this week.
> >>>
> >>> Thanks,
> >>> Victoria
> >>>
> >>> On Thu, Dec 1, 2022 at 9:47 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> Thanks Victoria!
> >>>>
> >>>> (1) About `ReadOnlyVersionedKeyValueStore` -- I am not sure about IQv1
> >>>> vs IQv2. But you might be right that adding the interface later might
> >>>> not be an issue -- so it does not matter. Just wanted to double check.
> >>>>
> >>>>
> >>>>
> >>>> (2) About `delete(key, ts)` -- as already discussed, I agree that it
> >>>> should have same semantics as `put(key, null, ts)` (delete() needs a
> >>>> timestamp). Not sure if `delete()` really needs to return anything? I
> >>>> would be ok to make it `void` -- but I think it's also semantically
> >>>> sound if it returns the "old" value at timestamps `ts` that the delete
> >>>> actually deleted, as you mentioned -- in the end, an "delete" is a
> >>>> physical append anyway (ie, "soft delete") as we want to track
> history.
> >>>>
> >>>>
> >>>>
> >>>> (3)
> >>>>> Ah, great question. I think the question boils down to: do we want to
> >>>>> require that all versioned stores (including custom user
> >> implementations)
> >>>>> use "history retention" to determine when to expire old record
> >> versions?
> >>>>
> >>>> I personally think, yes. The main reason for this is, that I think we
> >>>> need to have a clear contract so we can plug-in custom implementations
> >>>> into the DSL later? -- I guess, having a stricter contract initially,
> >>>> and relaxing it later if necessary, is the easier was forward, than
> the
> >>>> other way around.
> >>>>
> >>>> For PAPI users, they are not bound to implement the interface anyway
> and
> >>>> can just add any store they like by extending the top level
> `StateStore`
> >>>> interface.
> >>>>
> >>>>
> >>>>
> >>>> (4) About `segmentInterval`: I am personally fine both ways. Seems
> it's
> >>>> your call to expose it or not. It seems there is a slight preference
> to
> >>>> expose it.
> >>>>
> >>>>
> >>>>
> >>>> (5) About `validTo`: based on my experience, it's usually simpler to
> >>>> have it exclusive. It's also how it's defined in "system versioned
> >>>> temporal tables" in the SQL standard, and how `AS OF <ts>` queries
> work.
> >>>>
> >>>> For a join, it of course implies that if a table record has [100,200)
> as
> >>>> inclusive `validFrom=100` and exclusive `validTo=200` it would only
> join
> >>>> with a stream-side record with 100 <= ts <= 199 (or 100 <= ts < 200
> :)).
> >>>>
> >>>> I would strongly advocate to make the upper bound exclusive (it did
> >>>> serve us well in the past to align to SQL semantics). It must be
> clearly
> >>>> documented of course and we can also name variable accordingly if
> >>>> necessary.
> >>>>
> >>>>
> >>>>
> >>>> (6) About including `validTo` in return types -- it's not easy to
> change
> >>>> the return type, because the signature of a method is only determined
> by
> >>>> it's name in input parameter types, ie, we cannot overload an existing
> >>>> method to just change the return type, but would need to change its
> name
> >>>> or parameter list... Not sure if we can or cannot add `validTo` to
> >>>> `ValueAndTimestamp` though, but it's a tricky question. Would be good
> to
> >>>> get some more input from other if we think that it would be important
> >>>> enough to worry about it now or not.
> >>>>
> >>>>
> >>>>
> >>>> (7) About `get(k)` vs `get(k, ts)` vs `getAsOf(k, ts)`: I would prefer
> >>>> to just keep `get()` with two overloads and not add `getAsOf()`; the
> >>>> fact that we pass in a timestamp implies we have a point in time
> query.
> >>>> (It's cleaner API design to leverage method overloads IMHO, and it's
> >>>> what we did in the past). Of course, we can name the parameter
> `get(key,
> >>>> asOfTimestamp)` if we think it's helpful. And in alignment to have
> >>>> `validTo` exclusive, `validTo` would be `asOfTimestampe+1` (or
> larger),
> >>>> in case we return it.
> >>>>
> >>>>
> >>>>
> >>>> (8) About updating topic config (ie, history retention and compaction
> >>>> lag): It think it was actually some oversight to not update topic
> >>>> configs if the code changes. There is actually a Jira ticket about
> it. I
> >>>> would prefer to keep the behavior consistent though and not change it
> >>>> just for the new versioned-store, but change it globally in one shot
> >>>> independent of this KIP.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 12/1/22 10:15 AM, Sagar wrote:
> >>>>> Thanks Victoria,
> >>>>>
> >>>>> I guess an advantage of exposing a method like delete(key, timestamp)
> >>>> could
> >>>>> be that from a user's standpoint, it is a single operation and not 2.
> >> The
> >>>>> equivalent of this method i.e put followed by get is not atomic so
> >>>> exposing
> >>>>> it certainly sounds like a good idea.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia
> >>>>> <victoria....@confluent.io.invalid> wrote:
> >>>>>
> >>>>>> Thanks, Sagar and Bruno, for your insights and comments!
> >>>>>>
> >>>>>>> Sagar: Can we name according to the semantics that you want to
> >>>>>> support like `getAsOf` or something like that? I am not sure if we
> do
> >>>> that
> >>>>>> in our codebase though. Maybe the experts can chime in.
> >>>>>>
> >>>>>> Because it is a new method that will be added, we should be able to
> >>>> name it
> >>>>>> whatever we like. I agree `getAsOf` is more clear, albeit wordier.
> >>>>>> Introducing `getAsOf(key, timestamp)` means we could leave open
> >>>> `get(key,
> >>>>>> timeFrom, timeTo)` to have an exclusive `timeTo` without
> introducing a
> >>>>>> collision. (We could introduce `getBetween(key, timeFrom, timeTo)`
> >>>> instead
> >>>>>> to delineate even more clearly, though this is better left for a
> >> future
> >>>>>> KIP.)
> >>>>>>
> >>>>>> I don't think there's any existing precedent in codebase to follow
> >> here
> >>>> but
> >>>>>> I'll leave that to the experts. Curious to hear what others prefer
> as
> >>>> well.
> >>>>>>
> >>>>>>> Sagar: With delete, we would stlll keep the older versions of the
> key
> >>>>>> right?
> >>>>>>
> >>>>>> We could certainly choose this for the semantics of delete(...) --
> and
> >>>> it
> >>>>>> sounds like we should too, based on Bruno's confirmation below that
> >> this
> >>>>>> feels more natural to him as well -- but as Bruno noted in his
> message
> >>>>>> below I think we'll want the method signature to be `delete(key,
> >>>>>> timestamp)` then, so that there is an explicit timestamp to
> associate
> >>>> with
> >>>>>> the deletion. In other words, `delete(key, timestamp)` has the same
> >>>> effect
> >>>>>> as `put(key, null, timestamp)`. The only difference is that the
> >>>> `put(...)`
> >>>>>> method has a `void` return type, while `delete(key, timestamp)` can
> >> have
> >>>>>> `ValueAndTimestamp` as return type in order to return the record
> which
> >>>> is
> >>>>>> replaced (if any). In other words, `delete(key, timestamp)` is
> >>>> equivalent
> >>>>>> to `put(key, null, timestamp)` followed by `get(key, timestamp)`.
> >>>>>>
> >>>>>>> Bruno: I would also not change the semantics so that it deletes all
> >>>>>> versions of
> >>>>>> a key. I would rather add a new method purge(key) or
> >>>>>> deleteAllVersions(key) or similar if we want to have such a method
> in
> >>>>>> this first KIP.
> >>>>>>
> >>>>>> Makes sense; I'm convinced. Let's defer
> >>>>>> `purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's
> >>>> agreement
> >>>>>> that `delete(key, timestamp)` (as described above) is valuable, we
> can
> >>>> keep
> >>>>>> it in this first KIP even though it is syntactic sugar. If this
> turns
> >>>> into
> >>>>>> a larger discussion, we can defer this to a future KIP as well.
> >>>>>>
> >>>>>>> Bruno: I would treat the history retention as a strict limit. [...]
> >> You
> >>>>>> could also add historyRetentionMs() to the VersionedKeyValueStore<K,
> >> V>
> >>>>>> interface to make the concept of the history retention part of the
> >>>>>> interface.
> >>>>>>
> >>>>>> OK. That's the second vote for rewording the javadoc for
> >>>>>> `VersionedKeyValueStore#get(key, timestampTo)` to remove the
> >>>> parenthetical
> >>>>>> and clarify that history retention should be used to dictate this
> >> case,
> >>>> so
> >>>>>> I'll go ahead and do that. I'll leave out adding
> >> `historyRetentionMs()`
> >>>> to
> >>>>>> the interface for now, though, for the sake of consistency with
> other
> >>>>>> stores (e.g., window stores) which don't expose similar types of
> >>>>>> configurations from their interfaces.
> >>>>>>
> >>>>>>> Bruno: exclusive vs inclusive regarding validTo timestamp in get().
> >>>>>> Doesn't this decision depend on the semantics of the join for which
> >> this
> >>>>>> state store should be used?
> >>>>>>
> >>>>>> Yes, you are correct. As a user I would expect that a stream-side
> >> record
> >>>>>> with the same timestamp as a table-side record _would_ produce a
> join
> >>>>>> result, which is consistent with the proposal for timestampTo to be
> >>>>>> inclusive. (FWIW I tried this out with a Flink temporal join just
> now
> >>>> and
> >>>>>> observed this result as well. Not sure where to look for other
> >>>> standards to
> >>>>>> validate this expectation.)
> >>>>>>
> >>>>>>> Bruno: If Streams does not update min.compaction.lag.ms during
> >>>>>> rebalances,
> >>>>>> users have to do it each time they change history retention in the
> >> code,
> >>>>>> right? That seems odd to me. What is the actual reason for not
> >> updating
> >>>>>> the config? How does Streams handle updates to windowed stores?
> >>>>>>
> >>>>>> Yes, users will have to update min.compaction.lag.ms for the
> >> changelog
> >>>>>> topic themselves if they update history retention in their code.
> This
> >> is
> >>>>>> consistent with what happens for window stores today: e.g., if a
> user
> >>>>>> updates grace period for a windowed aggregation, then they are
> >>>> responsible
> >>>>>> for updating retention.ms on their windowed changelog topic as
> well.
> >>>>>>
> >>>>>> I'm not familiar with the historical context around why this is the
> >>>> case --
> >>>>>> Matthias, do you know?
> >>>>>>
> >>>>>> My best guess is that Streams does not want to interfere with any
> >>>> potential
> >>>>>> out-of-band changes by the user between application restarts, though
> >> I'm
> >>>>>> not sure why a user would want to change this specific config to a
> >> value
> >>>>>> which does not accord with the specified history retention. I notice
> >>>> that
> >>>>>> there is code for validating topic configs and collecting validation
> >>>> errors
> >>>>>> (
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L318-L319
> >>>>>> )
> >>>>>> but this method is not called from anywhere, even though there are
> >> unit
> >>>>>> tests for it. I was unable to find history of this validation after
> a
> >>>> quick
> >>>>>> search. Hopefully Matthias (or others) has context, otherwise I will
> >>>> have a
> >>>>>> closer look.
> >>>>>>
> >>>>>> - Victoria
> >>>>>>
> >>>>>> On Wed, Nov 23, 2022 at 8:52 AM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> Thanks for the KIP, Victoria!
> >>>>>>>
> >>>>>>> I have a couple of comments.
> >>>>>>>
> >>>>>>> 1. delete(key)
> >>>>>>> I think delete(key) should not remove all versions of a key. We
> >> should
> >>>>>>> use it to close the validity interval of the last version.
> >>>>>>> Assuming we have records of different versions for key A:
> >>>>>>> (A, e, 0, 2),
> >>>>>>> (A, f, 2, 3),
> >>>>>>> (A, g, 3, MAX)
> >>>>>>>
> >>>>>>> delete(A) would update them to
> >>>>>>>
> >>>>>>> (A, e, 0, 2),
> >>>>>>> (A, f, 2, 3),
> >>>>>>> (A, g, 3, 5)
> >>>>>>> (A, null, 5, MAX)
> >>>>>>>
> >>>>>>> But then the question arises where does timestamp 5 that closes the
> >>>>>>> interval in (A, g, 3, 5) and opens the interval in (A, null, 5,
> MAX)
> >>>>>>> come from. We could use the timestamp at which delete(A) is called,
> >> but
> >>>>>>> actually I do not like that because it seems to me it opens the
> doors
> >>>> to
> >>>>>>> non-determinism. If we use event time for put() we should also use
> it
> >>>>>>> for delete(). Actually, put(A, null, 5) would have the same effect
> as
> >>>>>>> delete(A) in the example above. As a syntactical sugar, we could
> add
> >>>>>>> delete(key, validFrom). (I just realized now that I just repeated
> >> what
> >>>>>>> Victoria said in her previous e-mail.)
> >>>>>>> I agree with Victoria that delete(A) as defined for other state
> >> stores
> >>>>>>> is hard to re-use in the versioned key-value store.
> >>>>>>> I would also not change the semantics so that it deletes all
> versions
> >>>> of
> >>>>>>> a key. I would rather add a new method purge(key) or
> >>>>>>> deleteAllVersions(key) or similar if we want to have such a method
> in
> >>>>>>> this first KIP.
> >>>>>>>
> >>>>>>>
> >>>>>>> 2. history retention
> >>>>>>> I would remove "(up to store implementation discretion when this is
> >> the
> >>>>>>> case)". I would treat the history retention as a strict limit. If
> >> users
> >>>>>>> want to implement a less strict behavior, they can still do it.
> Maybe
> >>>>>>> mention in the javadocs the implications of not adhering strictly
> to
> >>>> the
> >>>>>>> history retention. That is, the DSL might become non-deterministic.
> >> You
> >>>>>>> could also add historyRetentionMs() to the
> VersionedKeyValueStore<K,
> >> V>
> >>>>>>> interface to make the concept of the history retention part of the
> >>>>>>> interface.
> >>>>>>>
> >>>>>>> 3. null vs. exception for out-of-bound queries
> >>>>>>> I am in favor of null. The record version is not there anymore
> >> because
> >>>>>>> it expired. This seems to me normal and nothing exceptional. That
> >> would
> >>>>>>> also consistent with the behavior of other APIs as already
> mentioned.
> >>>>>>>
> >>>>>>>
> >>>>>>> 4. Exposing segmentInterval
> >>>>>>> Since we have evidence that the segment interval affects
> >> performance, I
> >>>>>>> would expose it. But I find it also OK to expose it once we have a
> >>>>>>> corresponding metric.
> >>>>>>>
> >>>>>>> 5. exclusive vs inclusive regarding validTo timestamp in get()
> >>>>>>> Doesn't this decision depend on the semantics of the join for which
> >>>> this
> >>>>>>> state store should be used? Should a record on the table side that
> >> has
> >>>>>>> the same timestamp as the record on the stream side join? Or should
> >>>> only
> >>>>>>> records in the table that are strictly before the record on the
> >> stream
> >>>>>>> side join?
> >>>>>>>
> >>>>>>>
> >>>>>>> 6. Not setting min.compaction.lag.ms during rebalances
> >>>>>>> If Streams does not update min.compaction.lag.ms during
> rebalances,
> >>>>>>> users have to do it each time they change history retention in the
> >>>> code,
> >>>>>>> right? That seems odd to me. What is the actual reason for not
> >> updating
> >>>>>>> the config? How does Streams handle updates to windowed stores?
> That
> >>>>>>> should be a similar situation for the retention time config of the
> >>>>>>> changelog topic.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 23.11.22 09:11, Sagar wrote:
> >>>>>>>> Hi Vicky,
> >>>>>>>>
> >>>>>>>> Thanks for your response!
> >>>>>>>>
> >>>>>>>> I would just use numbers to refer to your comments.
> >>>>>>>>
> >>>>>>>> 1) Thanks for your response. Even I am not totally sure whether
> >> these
> >>>>>>>> should be supported via IQv2 or via store interface. That said, I
> >>>>>>> wouldn't
> >>>>>>>> definitely qualify this as  blocking the KIP for sure so we can
> live
> >>>>>>>> without it :)
> >>>>>>>>
> >>>>>>>> 2) Yeah if the 2 APIs for get have different semantics for
> >>>> timestampTo,
> >>>>>>>> then it could be confusing. I went through the link for temporal
> >>>> tables
> >>>>>>>> (TFS!) and I now get why the AS OF semantics would have it
> >> inclusive.
> >>>> I
> >>>>>>>> think part of the problem is that the name get on it's own is not
> as
> >>>>>>>> expressive as SQL. Can we name according to the semantics that you
> >>>> want
> >>>>>>> to
> >>>>>>>> support like `getAsOf` or something like that? I am not sure if we
> >> do
> >>>>>>> that
> >>>>>>>> in our codebase though. Maybe the experts can chime in.
> >>>>>>>>
> >>>>>>>> 3) hmm I would have named it `validUpto` But again not very picky
> >>>> about
> >>>>>>> it.
> >>>>>>>> After going through the link and your KIP, it's a lot clearer to
> me.
> >>>>>>>>
> >>>>>>>> 4) I think delete(key) should be sufficient. With delete, we would
> >>>>>>>> stlll keep the older versions of the key right?
> >>>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>> Sagar.
> >>>>>>>>
> >>>>>>>> On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia
> >>>>>>>> <victoria....@confluent.io.invalid> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks, Matthias and Sagar, for your comments! I've responded
> here
> >>>> for
> >>>>>>> now,
> >>>>>>>>> and will update the KIP afterwards with the outcome of our
> >>>> discussions
> >>>>>>> as
> >>>>>>>>> they resolve.
> >>>>>>>>>
> >>>>>>>>> ----------- Matthias's comments -----------
> >>>>>>>>>
> >>>>>>>>>> (1) Why does the new store not extend KeyValueStore, but
> >> StateStore?
> >>>>>>>>> In the end, it's a KeyValueStore?
> >>>>>>>>>
> >>>>>>>>> A `VersionedKeyValueStore<K, V>` is not a `KeyValueStore<K, V>`
> >>>>>> because
> >>>>>>>>> many of the KeyValueStore methods would not make sense for a
> >>>> versioned
> >>>>>>>>> store. For example, `put(K key, V value)` is not meaningful for a
> >>>>>>> versioned
> >>>>>>>>> store because the record needs a timestamp associated with it.
> >>>>>>>>>
> >>>>>>>>> A `VersionedKeyValueStore<K, V>` is more similar to a
> >>>>>> `KeyValueStore<K,
> >>>>>>>>> ValueAndTimestamp<V>>` (i.e., `TimestampedKeyValueStore<K, V>`),
> >> but
> >>>>>>> some
> >>>>>>>>> of the TimestampedKeyValueStore methods are still problematic.
> For
> >>>>>>> example,
> >>>>>>>>> what does it mean for `delete(K key)` to have return type
> >>>>>>>>> `ValueAndTimestamp<V>`? Does this mean that `delete(K key)` only
> >>>>>> deletes
> >>>>>>>>> (and returns) the latest record version for the key? Probably we
> >> want
> >>>>>> a
> >>>>>>>>> versioned store to have `delete(K key)` delete all record
> versions
> >>>> for
> >>>>>>> the
> >>>>>>>>> given key, in which case the return type is better suited as an
> >>>>>>>>> iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
> >>>>>>>>> ValueAndTimestamp value)` also has ambiguous semantics for
> >> versioned
> >>>>>>> stores
> >>>>>>>>> (i.e., what does it mean for the key/record to be "absent").
> >>>>>>>>>
> >>>>>>>>> I agree that conceptually a versioned key-value store is just a
> >>>>>>> key-value
> >>>>>>>>> store, though. In the future if we redesign the store interfaces,
> >>>> it'd
> >>>>>>> be
> >>>>>>>>> great to unify them by having a more generic KeyValueStore
> >> interface
> >>>>>>> that
> >>>>>>>>> allows for extra flexibility to support different types of
> >> key-value
> >>>>>>>>> stores, including versioned stores. (Or, if you can think of a
> way
> >> to
> >>>>>>>>> achieve this with the existing interfaces today, I'm all ears!)
> >>>>>>>>>
> >>>>>>>>>> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we
> >>>> don't
> >>>>>>>>> want to support IQ in this KIP, it might be good to add this
> >>>> interface
> >>>>>>>>> right away to avoid complications for follow up KIPs? Or won't
> >> there
> >>>>>> by
> >>>>>>>>> any complications anyway?
> >>>>>>>>>
> >>>>>>>>> I don't think there will be complications for refactoring to add
> >> this
> >>>>>>>>> interface in the future. Refactoring out
> >>>>>> ReadOnlyVersionedKeyValueStore
> >>>>>>>>> from VersionedKeyValueStore would leave VersionedKeyValueStore
> >>>>>> unchanged
> >>>>>>>>> from the outside.
> >>>>>>>>>
> >>>>>>>>> Also, is it true that the ReadOnlyKeyValueStore interface is only
> >>>> used
> >>>>>>> for
> >>>>>>>>> IQv1 and not IQv2? I think it's an open question as to whether we
> >>>>>> should
> >>>>>>>>> support IQv1 for versioned stores or only IQv2. If the latter,
> then
> >>>>>>> maybe
> >>>>>>>>> we won't need the extra interface at all.
> >>>>>>>>>
> >>>>>>>>>> (3) Why do we not have a `delete(key)` method? I am ok with not
> >>>>>>>>> supporting all methods from existing KV-store, but a
> `delete(key)`
> >>>>>> seems
> >>>>>>>>> to be fundamentally to have?
> >>>>>>>>>
> >>>>>>>>> What do you think the semantics of `delete(key)` should be for
> >>>>>> versioned
> >>>>>>>>> stores? Should `delete(key)` delete (and return) all record
> >> versions
> >>>>>> for
> >>>>>>>>> the key? Or should we have `delete(key, timestamp)` which is
> >>>>>> equivalent
> >>>>>>> to
> >>>>>>>>> `put(key, null, timestamp)` except with a return type to return
> >>>>>>>>> ValueAndTimestamp representing the record it replaced?
> >>>>>>>>>
> >>>>>>>>> If we have ready alignment on what the interface and semantics
> for
> >>>>>>>>> `delete(key)` should be, then adding it in this KIP sounds good.
> I
> >>>>>> just
> >>>>>>>>> didn't want the rest of the KIP to be hung up over additional
> >>>>>>> interfaces,
> >>>>>>>>> given that we can always add extra interfaces in the future.
> >>>>>>>>>
> >>>>>>>>>> (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> >>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though?
> Just
> >>>> for
> >>>>>>>>> my own clarification (should we add something to the JavaDocs?).
> >>>>>>>>>
> >>>>>>>>> Correct, it is just syntactic sugar. I will add a clarification
> >> into
> >>>>>> the
> >>>>>>>>> Javadocs as you've suggested.
> >>>>>>>>>
> >>>>>>>>>> (4b) Should we throw an exception if a user queries out-of-bound
> >>>>>>>>> instead of returning `null` (in `get(key,ts)`)?
> >>>>>>>>>        -> You put it into "rejected alternatives", and I
> understand
> >>>> your
> >>>>>>>>> argument. Would love to get input from others about this question
> >>>>>>>>> though. -- It seems we also return `null` for windowed stores, so
> >>>>>> maybe
> >>>>>>>>> the strongest argument is to align to existing behavior? Or do we
> >>>> have
> >>>>>>>>> case for which the current behavior is problematic?
> >>>>>>>>>
> >>>>>>>>> Sure; curious to hear what others think as well.
> >>>>>>>>>
> >>>>>>>>>> (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
> >>>>>>>>> discretion when this is the case)" -> Should we make it a
> stricter
> >>>>>>>>> contract such that the user can reason about it better (there is
> >> WIP
> >>>>>> to
> >>>>>>>>> make retention time a strict bound for windowed stores atm)
> >>>>>>>>>        -> JavaDocs on `persistentVersionedKeyValueStore` seems to
> >>>>>> suggest a
> >>>>>>>>> strict bound, too.
> >>>>>>>>>
> >>>>>>>>> Ah, great question. I think the question boils down to: do we
> want
> >> to
> >>>>>>>>> require that all versioned stores (including custom user
> >>>>>>> implementations)
> >>>>>>>>> use "history retention" to determine when to expire old record
> >>>>>> versions?
> >>>>>>>>>
> >>>>>>>>> Because the `persistentVersionedKeyValueStore(...)` method
> returns
> >>>>>>>>> instances of the provided RocksDB-based versioned store
> >>>>>> implementation,
> >>>>>>>>> which does use history retention for this purpose, that's why we
> >> can
> >>>>>>> very
> >>>>>>>>> clearly say that for this store, `get(key, ts)` will return null
> if
> >>>>>> the
> >>>>>>>>> provided timestamp bound has fallen out of history retention. The
> >>>>>>> reason I
> >>>>>>>>> left the `VersionedKeyValueStore#get(key, ts)` Javadoc more
> generic
> >>>>>>> (i.e.,
> >>>>>>>>> does not mention history retention) is because maybe a user
> >>>>>> implementing
> >>>>>>>>> their own custom store will choose a different expiry mechanism,
> >>>> e.g.,
> >>>>>>> keep
> >>>>>>>>> the three latest versions for each key regardless of how old the
> >>>>>>> timestamps
> >>>>>>>>> are.
> >>>>>>>>>
> >>>>>>>>> If we want to require that all versioned stores use history
> >> retention
> >>>>>> in
> >>>>>>>>> order to determine when to expire old records, then I will
> >> certainly
> >>>>>>> update
> >>>>>>>>> the Javadoc to clarify. This is already a requirement for DSL
> users
> >>>>>>> because
> >>>>>>>>> the VersionedBytesStoreSupplier interface requires history
> >> retention
> >>>>>> to
> >>>>>>> be
> >>>>>>>>> provided (in order for changelog topic configs to be properly
> set),
> >>>> so
> >>>>>>> it's
> >>>>>>>>> just a question of whether we also want to require PAPI users to
> >> use
> >>>>>>>>> history retention too. I had a look at the existing window stores
> >> and
> >>>>>>>>> didn't see precedent for requiring all window stores have a
> >> standard
> >>>>>>>>> "retention time" concept for how long to keep windows, but if we
> >> want
> >>>>>> to
> >>>>>>>>> have a standard "history retention" concept for versioned stores
> we
> >>>>>>>>> certainly can. WDYT?
> >>>>>>>>>
> >>>>>>>>>> (5a) Do we need to expose `segmentInterval`? For
> windowed-stores,
> >> we
> >>>>>>>>> also use segments but hard-code it to two (it was exposed in
> >> earlier
> >>>>>>>>> versions but it seems not useful, even if we would be open to
> >> expose
> >>>>>> it
> >>>>>>>>> again if there is user demand).
> >>>>>>>>>
> >>>>>>>>> If we want to leave it out of this first KIP (and potentially
> >> expose
> >>>>>> it
> >>>>>>> in
> >>>>>>>>> the future), that works for me. The performance benchmarks I ran
> >>>>>> suggest
> >>>>>>>>> that this parameter greatly impacts store performance though and
> is
> >>>>>> very
> >>>>>>>>> workload dependent. If a user reported poor performance using
> >>>>>> versioned
> >>>>>>>>> stores for their workload, this is the first parameter I would
> want
> >>>> to
> >>>>>>>>> tune. That said, metrics/observability for versioned stores
> (which
> >>>>>>> would be
> >>>>>>>>> helpful for determining how this parameter should be adjusted)
> have
> >>>>>> been
> >>>>>>>>> deferred to a follow-up KIP, so perhaps that's reason to defer
> >>>>>> exposing
> >>>>>>>>> this parameter as well.
> >>>>>>>>>
> >>>>>>>>>> (5b) JavaDocs says: "Performance degrades as more record
> versions
> >>>> for
> >>>>>>>>> the same key are collected in a single segment. On the other
> hand,
> >>>>>>>>> out-of-order writes and reads which access older segments may
> slow
> >>>>>> down
> >>>>>>>>> if there are too many segments." -- Wondering if JavaDocs should
> >> make
> >>>>>>>>> any statements about expected performance? Seems to be an
> >>>>>> implementation
> >>>>>>>>> detail?
> >>>>>>>>>
> >>>>>>>>> I included this sentence to explain why a user might want to tune
> >>>> this
> >>>>>>>>> value / help guide how to think about the parameter, but if we
> want
> >>>> to
> >>>>>>>>> remove it entirely (per the discussion point above) then this
> >> Javadoc
> >>>>>>> will
> >>>>>>>>> be removed with it.
> >>>>>>>>>
> >>>>>>>>>> (6) validTo timestamp is "exclusive", right? Ie, if I query
> >>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next" record
> >> v2
> >>>>>>>>> with validFromV2=ts?
> >>>>>>>>>
> >>>>>>>>> I actually intended for it to be inclusive (will update the KIP).
> >> Do
> >>>>>> you
> >>>>>>>>> think exclusive is more intuitive? The reason I had inclusive in
> my
> >>>>>>> mind is
> >>>>>>>>> because it's like a "AS OF <time>" query, which treats the time
> >> bound
> >>>>>> as
> >>>>>>>>> inclusive.
> >>>>>>>>>
> >>>>>>>>>> (7) The KIP says, that segments are stores in the same RocksDB
> --
> >>>> for
> >>>>>>>>> this case, how are efficient deletes handled? For windowed-store,
> >> we
> >>>>>> can
> >>>>>>>>> just delete a full RocksDB.
> >>>>>>>>>
> >>>>>>>>> The way that multiple segments are represented in the same
> RocksDB
> >> is
> >>>>>>> that
> >>>>>>>>> the RocksDB keys are prefixed with segment ID. An entire segment
> is
> >>>>>>> deleted
> >>>>>>>>> with a single `deleteRange()` call to RocksDB.
> >>>>>>>>>
> >>>>>>>>>> (8) Rejected alternatives: you propose to not return the validTo
> >>>>>>>>> timestamp -- if we find it useful in the future to return it,
> would
> >>>>>>>>> there be a clean path to change it accordingly?
> >>>>>>>>>
> >>>>>>>>> With the current proposal, there's no clean path. If we think
> >> there's
> >>>>>> a
> >>>>>>>>> good chance we might want to do this in the future, then we
> should
> >>>>>>> update
> >>>>>>>>> the proposed interfaces.
> >>>>>>>>>
> >>>>>>>>> The current proposed return type from `VersionedKeyValueStore<K,
> >>>>>>>>> V>#get(key, tsTo)` is `ValueAndTimestamp<V>`. There's no way to
> >> add a
> >>>>>>>>> second timestamp into `ValueAndTimestamp<V>`, which is why
> there's
> >> no
> >>>>>>> clean
> >>>>>>>>> path to include validTo timestamp in the future under the
> existing
> >>>>>>>>> proposal.
> >>>>>>>>>
> >>>>>>>>> If we wanted to allow for including validTo timestamp in the
> >> future,
> >>>>>>> we'd
> >>>>>>>>> instead update the return type to be a new `VersionedRecord<V>`
> >>>>>> object.
> >>>>>>>>> Today a `VersionedRecord<V>` could just include `value` and
> >>>>>> `timestamp`,
> >>>>>>>>> and in the future we could add `validTo` (names subject to
> change)
> >>>>>> into
> >>>>>>> the
> >>>>>>>>> `VersionedRecord` as well. (It'd look a little strange for now
> >> since
> >>>>>>>>> VersionedRecord is the same as ValueAndTimestamp, but that seems
> >>>>>> fine.)
> >>>>>>>>>
> >>>>>>>>> If we choose to do this, I think we should also update the return
> >>>> type
> >>>>>>> of
> >>>>>>>>> `VersionedKeyValueStore#get(key)` to be VersionedRecord as well,
> >>>>>> rather
> >>>>>>>>> than having one return TimestampAndValue while the other returns
> >>>>>>>>> VersionedRecord.
> >>>>>>>>>
> >>>>>>>>> ----------- Sagar's comments -----------
> >>>>>>>>>
> >>>>>>>>>> 1) Did you consider adding a method similar to :
> >>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
> >>>>>>>>> I think this could be useful considering that this
> >>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT?
> >>>>>>>>>
> >>>>>>>>> Yes, I do think this method is valuable. I think we will
> definitely
> >>>>>>> want to
> >>>>>>>>> support time-range based queries at some point (hopefully soon),
> >> and
> >>>>>>> likely
> >>>>>>>>> also key-range based queries (to achieve feature parity with
> >> existing
> >>>>>>>>> key-value stores).
> >>>>>>>>>
> >>>>>>>>> It's not immediately clear to me whether these types of queries
> >>>> should
> >>>>>>> be
> >>>>>>>>> supported as part of the store interface or if they should only
> be
> >>>>>>>>> supported via the `query(...)` method for IQv2. (It's an open
> >>>> question
> >>>>>>> as
> >>>>>>>>> to whether we should support IQv1 for versioned stores or only
> >> IQv2.
> >>>> A
> >>>>>>>>> benefit of IQv2 over IQv1 is that we won't need to add individual
> >>>>>> store
> >>>>>>>>> methods for each type of query, including for all wrapped store
> >>>>>> layers.)
> >>>>>>>>>
> >>>>>>>>> If we have clear non-IQ use cases for these methods (e.g., use
> >> cases
> >>>>>>> within
> >>>>>>>>> processors), then they'll need to be added as part of the store
> >>>>>>> interface
> >>>>>>>>> for sure. I'm leaning towards adding them as part of the store
> >>>>>> interface
> >>>>>>>>> but given the ambiguity here, it may be preferrable to defer to a
> >>>>>>> follow-up
> >>>>>>>>> KIP. OTOH, if you think the versioned store interface as proposed
> >> in
> >>>>>>> this
> >>>>>>>>> KIP is too bare bones to be useful, I'm open to adding it in now
> as
> >>>>>>> well.
> >>>>>>>>>
> >>>>>>>>>> 2) I have a similar question as Matthias, about the timestampTo
> >>>>>>> argument
> >>>>>>>>> when doing a get. Is it inclusive or exclusive?
> >>>>>>>>>
> >>>>>>>>> Same answer (and follow-up question) as above. Do you think it
> will
> >>>> be
> >>>>>>>>> confusing for `get(key, tsTo)` to use an inclusive time bound,
> >> while
> >>>>>>>>> `get(key, tsFrom, tsTo)` would use an exclusive tsTo time bound?
> >>>> Maybe
> >>>>>>> we
> >>>>>>>>> should rename `get(key, tsFrom, tsTo)` to `getVersions(...)` or
> >>>>>>>>> `getRange(...)` in order to avoid confusion.
> >>>>>>>>>
> >>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is essentially
> >> the
> >>>>>>>>> timestamp at which the record was inserted. validFrom makes it
> >> sound
> >>>>>>> like
> >>>>>>>>> validTo which can keep changing based on new records while *from*
> >> is
> >>>>>>> fixed.
> >>>>>>>>> WDYT?
> >>>>>>>>>
> >>>>>>>>> "It is essentially the timestamp at which the record was
> inserted"
> >>>> <--
> >>>>>>> Yes,
> >>>>>>>>> that's correct.
> >>>>>>>>>
> >>>>>>>>> I borrowed the "validFrom/validTo" terminology from temporal
> >> tables,
> >>>>>>> e.g.,
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver16
> >>>>>>>>> .
> >>>>>>>>> I don't believe the terms "validFrom" or "validTo" are currently
> >>>>>> exposed
> >>>>>>>>> anywhere in any of the user-facing interfaces (or Javadocs); I
> just
> >>>>>>> needed
> >>>>>>>>> a way to refer to the concepts in the KIP. Hopefully this is a
> >>>>>> non-issue
> >>>>>>>>> (at least for now) as a result. Do you have a suggestion for
> >>>>>> terminology
> >>>>>>>>> that would've been less confusing?
> >>>>>>>>>
> >>>>>>>>>> 4) Even I think delete api should be supported.
> >>>>>>>>>
> >>>>>>>>> Makes sense. It'd be to get your input on the same follow-up
> >>>>>> questions I
> >>>>>>>>> asked Matthias above as well :)
> >>>>>>>>>
> >>>>>>>>> On Tue, Nov 22, 2022 at 4:25 AM Sagar <sagarmeansoc...@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Victoria,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the KIP. Seems like a very interesting idea!
> >>>>>>>>>>
> >>>>>>>>>> I have a couple of questions:
> >>>>>>>>>>
> >>>>>>>>>> 1) Did you consider adding a method similar to :
> >>>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
> >>>>>>>>>>
> >>>>>>>>>> I think this could be useful considering that this
> >>>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT?
> >>>>>>>>>>
> >>>>>>>>>> 2) I have a similar question as Matthias, about the timestampTo
> >>>>>>> argument
> >>>>>>>>>> when doing a get. Is it inclusive or exclusive?
> >>>>>>>>>>
> >>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is essentially
> >> the
> >>>>>>>>>> timestamp at which the record was inserted. validFrom makes it
> >> sound
> >>>>>>> like
> >>>>>>>>>> validTo which can keep changing based on new records while
> *from*
> >> is
> >>>>>>>>> fixed.
> >>>>>>>>>> WDYT?
> >>>>>>>>>>
> >>>>>>>>>> 4) Even I think delete api should be supported.
> >>>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>> Sagar.
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax <
> mj...@apache.org
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks for the KIP Victoria. Very well written!
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Couple of questions (many might just require to add some more
> >>>>>> details
> >>>>>>>>> to
> >>>>>>>>>>> the KIP):
> >>>>>>>>>>>
> >>>>>>>>>>>       (1) Why does the new store not extend KeyValueStore, but
> >>>>>>> StateStore?
> >>>>>>>>>>> In the end, it's a KeyValueStore?
> >>>>>>>>>>>
> >>>>>>>>>>>       (2) Should we have a ReadOnlyVersionedKeyValueStore? Even
> >> if we
> >>>>>>> don't
> >>>>>>>>>>> want to support IQ in this KIP, it might be good to add this
> >>>>>> interface
> >>>>>>>>>>> right away to avoid complications for follow up KIPs? Or won't
> >>>> there
> >>>>>>> by
> >>>>>>>>>>> any complications anyway?
> >>>>>>>>>>>
> >>>>>>>>>>>       (3) Why do we not have a `delete(key)` method? I am ok
> with
> >> not
> >>>>>>>>>>> supporting all methods from existing KV-store, but a
> >> `delete(key)`
> >>>>>>>>> seems
> >>>>>>>>>>> to be fundamentally to have?
> >>>>>>>>>>>
> >>>>>>>>>>>       (4a) Do we need `get(key)`? It seems to be the same as
> >>>> `get(key,
> >>>>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though?
> >> Just
> >>>>>> for
> >>>>>>>>>>> my own clarification (should we add something to the
> JavaDocs?).
> >>>>>>>>>>>
> >>>>>>>>>>>       (4b) Should we throw an exception if a user queries
> >>>> out-of-bound
> >>>>>>>>>>> instead of returning `null` (in `get(key,ts)`)?
> >>>>>>>>>>>        -> You put it into "rejected alternatives", and I
> >> understand
> >>>>>> your
> >>>>>>>>>>> argument. Would love to get input from others about this
> question
> >>>>>>>>>>> though. -- It seems we also return `null` for windowed stores,
> so
> >>>>>>> maybe
> >>>>>>>>>>> the strongest argument is to align to existing behavior? Or do
> we
> >>>>>> have
> >>>>>>>>>>> case for which the current behavior is problematic?
> >>>>>>>>>>>
> >>>>>>>>>>>       (4c) JavaDoc on `get(key,ts)` says: "(up to store
> >>>> implementation
> >>>>>>>>>>> discretion when this is the case)" -> Should we make it a
> >> stricter
> >>>>>>>>>>> contract such that the user can reason about it better (there
> is
> >>>> WIP
> >>>>>>> to
> >>>>>>>>>>> make retention time a strict bound for windowed stores atm)
> >>>>>>>>>>>        -> JavaDocs on `persistentVersionedKeyValueStore` seems
> to
> >>>>>>> suggest a
> >>>>>>>>>>> strict bound, too.
> >>>>>>>>>>>
> >>>>>>>>>>>       (5a) Do we need to expose `segmentInterval`? For
> >>>> windowed-stores,
> >>>>>>> we
> >>>>>>>>>>> also use segments but hard-code it to two (it was exposed in
> >>>> earlier
> >>>>>>>>>>> versions but it seems not useful, even if we would be open to
> >>>> expose
> >>>>>>> it
> >>>>>>>>>>> again if there is user demand).
> >>>>>>>>>>>
> >>>>>>>>>>>       (5b) JavaDocs says: "Performance degrades as more record
> >>>> versions
> >>>>>>> for
> >>>>>>>>>>> the same key are collected in a single segment. On the other
> >> hand,
> >>>>>>>>>>> out-of-order writes and reads which access older segments may
> >> slow
> >>>>>>> down
> >>>>>>>>>>> if there are too many segments." -- Wondering if JavaDocs
> should
> >>>>>> make
> >>>>>>>>>>> any statements about expected performance? Seems to be an
> >>>>>>>>> implementation
> >>>>>>>>>>> detail?
> >>>>>>>>>>>
> >>>>>>>>>>>       (6) validTo timestamp is "exclusive", right? Ie, if I
> query
> >>>>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next"
> record
> >>>> v2
> >>>>>>>>>>> with validFromV2=ts?
> >>>>>>>>>>>
> >>>>>>>>>>>       (7) The KIP says, that segments are stores in the same
> >> RocksDB
> >>>> --
> >>>>>>> for
> >>>>>>>>>>> this case, how are efficient deletes handled? For
> windowed-store,
> >>>> we
> >>>>>>>>> can
> >>>>>>>>>>> just delete a full RocksDB.
> >>>>>>>>>>>
> >>>>>>>>>>>       (8) Rejected alternatives: you propose to not return the
> >>>> validTo
> >>>>>>>>>>> timestamp -- if we find it useful in the future to return it,
> >> would
> >>>>>>>>>>> there be a clean path to change it accordingly?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 11/16/22 9:57 PM, Victoria Xia wrote:
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have a proposal for introducing versioned state stores in
> >> Kafka
> >>>>>>>>>>> Streams.
> >>>>>>>>>>>> Versioned state stores are similar to key-value stores except
> >> they
> >>>>>>>>> can
> >>>>>>>>>>>> store multiple record versions for a single key. This KIP
> >> focuses
> >>>>>> on
> >>>>>>>>>>>> interfaces only in order to limit the scope of the KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Victoria
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to