Thanks again for the great discussion, Sagar, Bruno, and Matthias. I've just sent a message to start the vote on this KIP. Please have a look when you get the chance.
Thanks, Victoria On Wed, Dec 14, 2022 at 12:28 PM Matthias J. Sax <mj...@apache.org> wrote: > Thanks for clarifying about the null-question. SGTM. > > On 12/13/22 3:06 PM, Victoria Xia wrote: > > Hi Matthias, > > > > Thanks for chiming in! Barring objections from anyone on this thread, I > > will start the vote for this KIP on Thursday. That should be enough time > to > > incorporate any lingering minor changes. > > > >> I slightly prefer to add `VersionedRecord` interface (also > > like the name). I agree that it's low overhead and providing a clean > > path forward for future changes seems worth it to me. > > > > OK, that makes two of us. I updated the KIP just now to formally include > > VersionedRecord as the new return type from the various > > VersionedKeyValueStore methods. > > > >> if we introduce `VersionedRecord`, I think we can keep the not-null > > requirement for `ValueAndTimestamp` > > > > Not quite. VersionedRecord is only used as a return type from read > methods, > > which is why VersionedRecord is able to enforce that its value is never > > null. If the value being returned would have been null, then we return a > > null VersionedRecord instead, rather than non-null VersionedRecord with > > null value. So, there's no use case for a VersionedRecord with null > value. > > > > In contrast, even though ValueAndTimestamp is not anywhere in the public > > VersionedKeyValueStore interface, ValueAndTimestamp still needs to be > used > > internally when representing a versioned key-value store as a > > TimestampedKeyValueStore, since TimestampedKeyValueStore is used > everywhere > > throughout the internals of the codebase. In order to represent a > versioned > > key-value store as a TimestampedKeyValueStore, we have to support `put(K > > key, ValueAndTimestamp<V> value)`, which means ValueAndTimestamp needs to > > support null value (with timestamp). Otherwise we cannot put a tombstone > > into a versioned key-value store when using the internal > > TimestampedKeyValueStore representation. > > > > It's very much an implementation detail that ValueAndTimestamp needs to > be > > relaxed to allow null values. I think this is a minor enough change that > is > > still preferable to the alternatives (refactoring the processors to not > > require TimestampedKeyValueStore, or introducing a separate workaround > > `put()` method on the TimestampedKeyValueStore representation of > versioned > > key-value stores), so I have left it in as part of the KIP. > > > > Best, > > Victoria > > > > On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Thanks Victoria. > >> > >> I did not re-read the KIP in full on the wiki but only your email. > >> > >> Points (1)-(8) SGTM. > >> > >> About (9): I slightly prefer to add `VersionedRecord` interface (also > >> like the name). I agree that it's low overhead and providing a clean > >> path forward for future changes seems worth it to me. Btw: if we > >> introduce `VersionedRecord`, I think we can keep the not-null > >> requirement for `ValueAndTimestamp` what seems a small side benefit. > >> (Btw: your code snippet in the KIP shows what `VersionedRecord` would > >> have a non-null requirement for the value, but I think it would need to > >> allow null as value?) > >> > >> > >> -Matthias > >> > >> On 12/7/22 5:23 PM, Victoria Xia wrote: > >>> Thanks for the discussion, Bruno, Sagar, and Matthias! > >>> > >>> It seems we've reached consensus on almost all of the discussion > points. > >>> I've updated the KIP with the following: > >>> 1) renamed "timestampTo" in `get(key, timestampTo)` to "asOfTimestamp" > to > >>> clarify that this timestamp bound is inclusive, per the SQL guideline > >> that > >>> "AS OF <timestamp>" queries are inclusive. In the future, if we want to > >>> introduce a timestamp range query, we can use `get(key, timestampFrom, > >>> timestampTo)` and specify that timestampTo is exclusive in this method, > >>> while avoiding confusing with the inclusive asOfTimestamp parameter in > >> the > >>> other method, given that the names are different. > >>> 2) added a description of "history retention" semantics into the > >>> VersionedKeyValueStore interface Javadoc, and updated the Javadoc for > >>> `get(key, asOfTimestamp)` to mention explicitly that a null result is > >>> returned if the provided timestamp bound is not within history > retention. > >>> 3) added a `delete(key, timestamp)` method (with return type > >>> `ValueAndTimestamp<V>`) to the VersionedKeyValueStore interface. > >>> 4) updated the Javadoc for `segmentInterval` to clarify that the only > >>> reason a user might be interested in this parameter is performance. > >>> > >>> Other points we discussed which did not result in updates include: > >>> 5) whether to automatically update the `min.compaction.lag.ms` config > on > >>> changelog topics when history retention is changed -- there's support > for > >>> this but let's not bundle it with this KIP. We can have a separate KIP > to > >>> change this behavior for the existing windowed changelog topics, in > >>> addition to versioned changelog topics. > >>> 6) should we expose segmentInterval in this KIP -- let's go ahead and > >>> expose it now since we'll almost certainly expose it (in this same > >> manner) > >>> in a follow-on KIP anyway, and so that poor performance for user > >> workloads > >>> is less likely to be a barrier for users getting started with this > >> feature. > >>> I updated the Javadoc for this parameter to clarify why the Javadoc > >>> mentions performance despite Javadocs typically not doing so. > >>> 7) `get(timestampFrom, timestampTo)` and other methods for IQ -- very > >>> important but deferred to a future KIP > >>> 8) `purge(key)`/`deleteAllVersions(key)` -- deferred to a future KIP > >>> > >>> That leaves only one unresolved discussion point: > >>> 9) whether to include validTo in the return types from `get(...)`. If > we > >> go > >>> with the current proposal of not including validTo in the return type, > >> then > >>> it will not be easy to add it in the future (unless we want to add > >> validTo > >>> to ValueAndTimestamp, which feels odd to me). If we think we might want > >> to > >>> have validTo in the future, we can change the return type of `get(...)` > >> and > >>> `delete(...)` in this proposal from `ValueAndTimestamp<V>` to a new > type, > >>> e.g., `VersionedRecord<V>` or `RecordVersion<V>`, which today will look > >> the > >>> same as `ValueAndTimestamp<V>` but in the future we can add validTo if > we > >>> want. The cost is a new type which today looks the same as > >>> ValueAndTimestamp. > >>> > >>> Now that I think about it more, the cost to introducing a new type > seems > >>> relatively low. I've added a proposal towards the bottom of the KIP > here > >>> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores#KIP889:VersionedStateStores-Additionalreturntimestampsfromget(key,asOfTimestamp) > >>> . > >>> If others also believe that the cost of introducing this new interface > is > >>> low (particularly relative to the flexibility it provides us for being > >> able > >>> to evolve the class in the future), I will incorporate this proposal > into > >>> the KIP. I think the hardest part of this will be deciding on a name > for > >>> the new class :) > >>> > >>> Pending objections, I'd like to make a call on item (9) and call a vote > >> on > >>> this KIP at the end of this week. > >>> > >>> Thanks, > >>> Victoria > >>> > >>> On Thu, Dec 1, 2022 at 9:47 PM Matthias J. Sax <mj...@apache.org> > wrote: > >>> > >>>> Thanks Victoria! > >>>> > >>>> (1) About `ReadOnlyVersionedKeyValueStore` -- I am not sure about IQv1 > >>>> vs IQv2. But you might be right that adding the interface later might > >>>> not be an issue -- so it does not matter. Just wanted to double check. > >>>> > >>>> > >>>> > >>>> (2) About `delete(key, ts)` -- as already discussed, I agree that it > >>>> should have same semantics as `put(key, null, ts)` (delete() needs a > >>>> timestamp). Not sure if `delete()` really needs to return anything? I > >>>> would be ok to make it `void` -- but I think it's also semantically > >>>> sound if it returns the "old" value at timestamps `ts` that the delete > >>>> actually deleted, as you mentioned -- in the end, an "delete" is a > >>>> physical append anyway (ie, "soft delete") as we want to track > history. > >>>> > >>>> > >>>> > >>>> (3) > >>>>> Ah, great question. I think the question boils down to: do we want to > >>>>> require that all versioned stores (including custom user > >> implementations) > >>>>> use "history retention" to determine when to expire old record > >> versions? > >>>> > >>>> I personally think, yes. The main reason for this is, that I think we > >>>> need to have a clear contract so we can plug-in custom implementations > >>>> into the DSL later? -- I guess, having a stricter contract initially, > >>>> and relaxing it later if necessary, is the easier was forward, than > the > >>>> other way around. > >>>> > >>>> For PAPI users, they are not bound to implement the interface anyway > and > >>>> can just add any store they like by extending the top level > `StateStore` > >>>> interface. > >>>> > >>>> > >>>> > >>>> (4) About `segmentInterval`: I am personally fine both ways. Seems > it's > >>>> your call to expose it or not. It seems there is a slight preference > to > >>>> expose it. > >>>> > >>>> > >>>> > >>>> (5) About `validTo`: based on my experience, it's usually simpler to > >>>> have it exclusive. It's also how it's defined in "system versioned > >>>> temporal tables" in the SQL standard, and how `AS OF <ts>` queries > work. > >>>> > >>>> For a join, it of course implies that if a table record has [100,200) > as > >>>> inclusive `validFrom=100` and exclusive `validTo=200` it would only > join > >>>> with a stream-side record with 100 <= ts <= 199 (or 100 <= ts < 200 > :)). > >>>> > >>>> I would strongly advocate to make the upper bound exclusive (it did > >>>> serve us well in the past to align to SQL semantics). It must be > clearly > >>>> documented of course and we can also name variable accordingly if > >>>> necessary. > >>>> > >>>> > >>>> > >>>> (6) About including `validTo` in return types -- it's not easy to > change > >>>> the return type, because the signature of a method is only determined > by > >>>> it's name in input parameter types, ie, we cannot overload an existing > >>>> method to just change the return type, but would need to change its > name > >>>> or parameter list... Not sure if we can or cannot add `validTo` to > >>>> `ValueAndTimestamp` though, but it's a tricky question. Would be good > to > >>>> get some more input from other if we think that it would be important > >>>> enough to worry about it now or not. > >>>> > >>>> > >>>> > >>>> (7) About `get(k)` vs `get(k, ts)` vs `getAsOf(k, ts)`: I would prefer > >>>> to just keep `get()` with two overloads and not add `getAsOf()`; the > >>>> fact that we pass in a timestamp implies we have a point in time > query. > >>>> (It's cleaner API design to leverage method overloads IMHO, and it's > >>>> what we did in the past). Of course, we can name the parameter > `get(key, > >>>> asOfTimestamp)` if we think it's helpful. And in alignment to have > >>>> `validTo` exclusive, `validTo` would be `asOfTimestampe+1` (or > larger), > >>>> in case we return it. > >>>> > >>>> > >>>> > >>>> (8) About updating topic config (ie, history retention and compaction > >>>> lag): It think it was actually some oversight to not update topic > >>>> configs if the code changes. There is actually a Jira ticket about > it. I > >>>> would prefer to keep the behavior consistent though and not change it > >>>> just for the new versioned-store, but change it globally in one shot > >>>> independent of this KIP. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> On 12/1/22 10:15 AM, Sagar wrote: > >>>>> Thanks Victoria, > >>>>> > >>>>> I guess an advantage of exposing a method like delete(key, timestamp) > >>>> could > >>>>> be that from a user's standpoint, it is a single operation and not 2. > >> The > >>>>> equivalent of this method i.e put followed by get is not atomic so > >>>> exposing > >>>>> it certainly sounds like a good idea. > >>>>> > >>>>> Thanks! > >>>>> Sagar. > >>>>> > >>>>> On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia > >>>>> <victoria....@confluent.io.invalid> wrote: > >>>>> > >>>>>> Thanks, Sagar and Bruno, for your insights and comments! > >>>>>> > >>>>>>> Sagar: Can we name according to the semantics that you want to > >>>>>> support like `getAsOf` or something like that? I am not sure if we > do > >>>> that > >>>>>> in our codebase though. Maybe the experts can chime in. > >>>>>> > >>>>>> Because it is a new method that will be added, we should be able to > >>>> name it > >>>>>> whatever we like. I agree `getAsOf` is more clear, albeit wordier. > >>>>>> Introducing `getAsOf(key, timestamp)` means we could leave open > >>>> `get(key, > >>>>>> timeFrom, timeTo)` to have an exclusive `timeTo` without > introducing a > >>>>>> collision. (We could introduce `getBetween(key, timeFrom, timeTo)` > >>>> instead > >>>>>> to delineate even more clearly, though this is better left for a > >> future > >>>>>> KIP.) > >>>>>> > >>>>>> I don't think there's any existing precedent in codebase to follow > >> here > >>>> but > >>>>>> I'll leave that to the experts. Curious to hear what others prefer > as > >>>> well. > >>>>>> > >>>>>>> Sagar: With delete, we would stlll keep the older versions of the > key > >>>>>> right? > >>>>>> > >>>>>> We could certainly choose this for the semantics of delete(...) -- > and > >>>> it > >>>>>> sounds like we should too, based on Bruno's confirmation below that > >> this > >>>>>> feels more natural to him as well -- but as Bruno noted in his > message > >>>>>> below I think we'll want the method signature to be `delete(key, > >>>>>> timestamp)` then, so that there is an explicit timestamp to > associate > >>>> with > >>>>>> the deletion. In other words, `delete(key, timestamp)` has the same > >>>> effect > >>>>>> as `put(key, null, timestamp)`. The only difference is that the > >>>> `put(...)` > >>>>>> method has a `void` return type, while `delete(key, timestamp)` can > >> have > >>>>>> `ValueAndTimestamp` as return type in order to return the record > which > >>>> is > >>>>>> replaced (if any). In other words, `delete(key, timestamp)` is > >>>> equivalent > >>>>>> to `put(key, null, timestamp)` followed by `get(key, timestamp)`. > >>>>>> > >>>>>>> Bruno: I would also not change the semantics so that it deletes all > >>>>>> versions of > >>>>>> a key. I would rather add a new method purge(key) or > >>>>>> deleteAllVersions(key) or similar if we want to have such a method > in > >>>>>> this first KIP. > >>>>>> > >>>>>> Makes sense; I'm convinced. Let's defer > >>>>>> `purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's > >>>> agreement > >>>>>> that `delete(key, timestamp)` (as described above) is valuable, we > can > >>>> keep > >>>>>> it in this first KIP even though it is syntactic sugar. If this > turns > >>>> into > >>>>>> a larger discussion, we can defer this to a future KIP as well. > >>>>>> > >>>>>>> Bruno: I would treat the history retention as a strict limit. [...] > >> You > >>>>>> could also add historyRetentionMs() to the VersionedKeyValueStore<K, > >> V> > >>>>>> interface to make the concept of the history retention part of the > >>>>>> interface. > >>>>>> > >>>>>> OK. That's the second vote for rewording the javadoc for > >>>>>> `VersionedKeyValueStore#get(key, timestampTo)` to remove the > >>>> parenthetical > >>>>>> and clarify that history retention should be used to dictate this > >> case, > >>>> so > >>>>>> I'll go ahead and do that. I'll leave out adding > >> `historyRetentionMs()` > >>>> to > >>>>>> the interface for now, though, for the sake of consistency with > other > >>>>>> stores (e.g., window stores) which don't expose similar types of > >>>>>> configurations from their interfaces. > >>>>>> > >>>>>>> Bruno: exclusive vs inclusive regarding validTo timestamp in get(). > >>>>>> Doesn't this decision depend on the semantics of the join for which > >> this > >>>>>> state store should be used? > >>>>>> > >>>>>> Yes, you are correct. As a user I would expect that a stream-side > >> record > >>>>>> with the same timestamp as a table-side record _would_ produce a > join > >>>>>> result, which is consistent with the proposal for timestampTo to be > >>>>>> inclusive. (FWIW I tried this out with a Flink temporal join just > now > >>>> and > >>>>>> observed this result as well. Not sure where to look for other > >>>> standards to > >>>>>> validate this expectation.) > >>>>>> > >>>>>>> Bruno: If Streams does not update min.compaction.lag.ms during > >>>>>> rebalances, > >>>>>> users have to do it each time they change history retention in the > >> code, > >>>>>> right? That seems odd to me. What is the actual reason for not > >> updating > >>>>>> the config? How does Streams handle updates to windowed stores? > >>>>>> > >>>>>> Yes, users will have to update min.compaction.lag.ms for the > >> changelog > >>>>>> topic themselves if they update history retention in their code. > This > >> is > >>>>>> consistent with what happens for window stores today: e.g., if a > user > >>>>>> updates grace period for a windowed aggregation, then they are > >>>> responsible > >>>>>> for updating retention.ms on their windowed changelog topic as > well. > >>>>>> > >>>>>> I'm not familiar with the historical context around why this is the > >>>> case -- > >>>>>> Matthias, do you know? > >>>>>> > >>>>>> My best guess is that Streams does not want to interfere with any > >>>> potential > >>>>>> out-of-band changes by the user between application restarts, though > >> I'm > >>>>>> not sure why a user would want to change this specific config to a > >> value > >>>>>> which does not accord with the specified history retention. I notice > >>>> that > >>>>>> there is code for validating topic configs and collecting validation > >>>> errors > >>>>>> ( > >>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L318-L319 > >>>>>> ) > >>>>>> but this method is not called from anywhere, even though there are > >> unit > >>>>>> tests for it. I was unable to find history of this validation after > a > >>>> quick > >>>>>> search. Hopefully Matthias (or others) has context, otherwise I will > >>>> have a > >>>>>> closer look. > >>>>>> > >>>>>> - Victoria > >>>>>> > >>>>>> On Wed, Nov 23, 2022 at 8:52 AM Bruno Cadonna <cado...@apache.org> > >>>> wrote: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> Thanks for the KIP, Victoria! > >>>>>>> > >>>>>>> I have a couple of comments. > >>>>>>> > >>>>>>> 1. delete(key) > >>>>>>> I think delete(key) should not remove all versions of a key. We > >> should > >>>>>>> use it to close the validity interval of the last version. > >>>>>>> Assuming we have records of different versions for key A: > >>>>>>> (A, e, 0, 2), > >>>>>>> (A, f, 2, 3), > >>>>>>> (A, g, 3, MAX) > >>>>>>> > >>>>>>> delete(A) would update them to > >>>>>>> > >>>>>>> (A, e, 0, 2), > >>>>>>> (A, f, 2, 3), > >>>>>>> (A, g, 3, 5) > >>>>>>> (A, null, 5, MAX) > >>>>>>> > >>>>>>> But then the question arises where does timestamp 5 that closes the > >>>>>>> interval in (A, g, 3, 5) and opens the interval in (A, null, 5, > MAX) > >>>>>>> come from. We could use the timestamp at which delete(A) is called, > >> but > >>>>>>> actually I do not like that because it seems to me it opens the > doors > >>>> to > >>>>>>> non-determinism. If we use event time for put() we should also use > it > >>>>>>> for delete(). Actually, put(A, null, 5) would have the same effect > as > >>>>>>> delete(A) in the example above. As a syntactical sugar, we could > add > >>>>>>> delete(key, validFrom). (I just realized now that I just repeated > >> what > >>>>>>> Victoria said in her previous e-mail.) > >>>>>>> I agree with Victoria that delete(A) as defined for other state > >> stores > >>>>>>> is hard to re-use in the versioned key-value store. > >>>>>>> I would also not change the semantics so that it deletes all > versions > >>>> of > >>>>>>> a key. I would rather add a new method purge(key) or > >>>>>>> deleteAllVersions(key) or similar if we want to have such a method > in > >>>>>>> this first KIP. > >>>>>>> > >>>>>>> > >>>>>>> 2. history retention > >>>>>>> I would remove "(up to store implementation discretion when this is > >> the > >>>>>>> case)". I would treat the history retention as a strict limit. If > >> users > >>>>>>> want to implement a less strict behavior, they can still do it. > Maybe > >>>>>>> mention in the javadocs the implications of not adhering strictly > to > >>>> the > >>>>>>> history retention. That is, the DSL might become non-deterministic. > >> You > >>>>>>> could also add historyRetentionMs() to the > VersionedKeyValueStore<K, > >> V> > >>>>>>> interface to make the concept of the history retention part of the > >>>>>>> interface. > >>>>>>> > >>>>>>> 3. null vs. exception for out-of-bound queries > >>>>>>> I am in favor of null. The record version is not there anymore > >> because > >>>>>>> it expired. This seems to me normal and nothing exceptional. That > >> would > >>>>>>> also consistent with the behavior of other APIs as already > mentioned. > >>>>>>> > >>>>>>> > >>>>>>> 4. Exposing segmentInterval > >>>>>>> Since we have evidence that the segment interval affects > >> performance, I > >>>>>>> would expose it. But I find it also OK to expose it once we have a > >>>>>>> corresponding metric. > >>>>>>> > >>>>>>> 5. exclusive vs inclusive regarding validTo timestamp in get() > >>>>>>> Doesn't this decision depend on the semantics of the join for which > >>>> this > >>>>>>> state store should be used? Should a record on the table side that > >> has > >>>>>>> the same timestamp as the record on the stream side join? Or should > >>>> only > >>>>>>> records in the table that are strictly before the record on the > >> stream > >>>>>>> side join? > >>>>>>> > >>>>>>> > >>>>>>> 6. Not setting min.compaction.lag.ms during rebalances > >>>>>>> If Streams does not update min.compaction.lag.ms during > rebalances, > >>>>>>> users have to do it each time they change history retention in the > >>>> code, > >>>>>>> right? That seems odd to me. What is the actual reason for not > >> updating > >>>>>>> the config? How does Streams handle updates to windowed stores? > That > >>>>>>> should be a similar situation for the retention time config of the > >>>>>>> changelog topic. > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 23.11.22 09:11, Sagar wrote: > >>>>>>>> Hi Vicky, > >>>>>>>> > >>>>>>>> Thanks for your response! > >>>>>>>> > >>>>>>>> I would just use numbers to refer to your comments. > >>>>>>>> > >>>>>>>> 1) Thanks for your response. Even I am not totally sure whether > >> these > >>>>>>>> should be supported via IQv2 or via store interface. That said, I > >>>>>>> wouldn't > >>>>>>>> definitely qualify this as blocking the KIP for sure so we can > live > >>>>>>>> without it :) > >>>>>>>> > >>>>>>>> 2) Yeah if the 2 APIs for get have different semantics for > >>>> timestampTo, > >>>>>>>> then it could be confusing. I went through the link for temporal > >>>> tables > >>>>>>>> (TFS!) and I now get why the AS OF semantics would have it > >> inclusive. > >>>> I > >>>>>>>> think part of the problem is that the name get on it's own is not > as > >>>>>>>> expressive as SQL. Can we name according to the semantics that you > >>>> want > >>>>>>> to > >>>>>>>> support like `getAsOf` or something like that? I am not sure if we > >> do > >>>>>>> that > >>>>>>>> in our codebase though. Maybe the experts can chime in. > >>>>>>>> > >>>>>>>> 3) hmm I would have named it `validUpto` But again not very picky > >>>> about > >>>>>>> it. > >>>>>>>> After going through the link and your KIP, it's a lot clearer to > me. > >>>>>>>> > >>>>>>>> 4) I think delete(key) should be sufficient. With delete, we would > >>>>>>>> stlll keep the older versions of the key right? > >>>>>>>> > >>>>>>>> Thanks! > >>>>>>>> Sagar. > >>>>>>>> > >>>>>>>> On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia > >>>>>>>> <victoria....@confluent.io.invalid> wrote: > >>>>>>>> > >>>>>>>>> Thanks, Matthias and Sagar, for your comments! I've responded > here > >>>> for > >>>>>>> now, > >>>>>>>>> and will update the KIP afterwards with the outcome of our > >>>> discussions > >>>>>>> as > >>>>>>>>> they resolve. > >>>>>>>>> > >>>>>>>>> ----------- Matthias's comments ----------- > >>>>>>>>> > >>>>>>>>>> (1) Why does the new store not extend KeyValueStore, but > >> StateStore? > >>>>>>>>> In the end, it's a KeyValueStore? > >>>>>>>>> > >>>>>>>>> A `VersionedKeyValueStore<K, V>` is not a `KeyValueStore<K, V>` > >>>>>> because > >>>>>>>>> many of the KeyValueStore methods would not make sense for a > >>>> versioned > >>>>>>>>> store. For example, `put(K key, V value)` is not meaningful for a > >>>>>>> versioned > >>>>>>>>> store because the record needs a timestamp associated with it. > >>>>>>>>> > >>>>>>>>> A `VersionedKeyValueStore<K, V>` is more similar to a > >>>>>> `KeyValueStore<K, > >>>>>>>>> ValueAndTimestamp<V>>` (i.e., `TimestampedKeyValueStore<K, V>`), > >> but > >>>>>>> some > >>>>>>>>> of the TimestampedKeyValueStore methods are still problematic. > For > >>>>>>> example, > >>>>>>>>> what does it mean for `delete(K key)` to have return type > >>>>>>>>> `ValueAndTimestamp<V>`? Does this mean that `delete(K key)` only > >>>>>> deletes > >>>>>>>>> (and returns) the latest record version for the key? Probably we > >> want > >>>>>> a > >>>>>>>>> versioned store to have `delete(K key)` delete all record > versions > >>>> for > >>>>>>> the > >>>>>>>>> given key, in which case the return type is better suited as an > >>>>>>>>> iterator/collection of KeyValueTimestamp. `putIfAbsent(K key, > >>>>>>>>> ValueAndTimestamp value)` also has ambiguous semantics for > >> versioned > >>>>>>> stores > >>>>>>>>> (i.e., what does it mean for the key/record to be "absent"). > >>>>>>>>> > >>>>>>>>> I agree that conceptually a versioned key-value store is just a > >>>>>>> key-value > >>>>>>>>> store, though. In the future if we redesign the store interfaces, > >>>> it'd > >>>>>>> be > >>>>>>>>> great to unify them by having a more generic KeyValueStore > >> interface > >>>>>>> that > >>>>>>>>> allows for extra flexibility to support different types of > >> key-value > >>>>>>>>> stores, including versioned stores. (Or, if you can think of a > way > >> to > >>>>>>>>> achieve this with the existing interfaces today, I'm all ears!) > >>>>>>>>> > >>>>>>>>>> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we > >>>> don't > >>>>>>>>> want to support IQ in this KIP, it might be good to add this > >>>> interface > >>>>>>>>> right away to avoid complications for follow up KIPs? Or won't > >> there > >>>>>> by > >>>>>>>>> any complications anyway? > >>>>>>>>> > >>>>>>>>> I don't think there will be complications for refactoring to add > >> this > >>>>>>>>> interface in the future. Refactoring out > >>>>>> ReadOnlyVersionedKeyValueStore > >>>>>>>>> from VersionedKeyValueStore would leave VersionedKeyValueStore > >>>>>> unchanged > >>>>>>>>> from the outside. > >>>>>>>>> > >>>>>>>>> Also, is it true that the ReadOnlyKeyValueStore interface is only > >>>> used > >>>>>>> for > >>>>>>>>> IQv1 and not IQv2? I think it's an open question as to whether we > >>>>>> should > >>>>>>>>> support IQv1 for versioned stores or only IQv2. If the latter, > then > >>>>>>> maybe > >>>>>>>>> we won't need the extra interface at all. > >>>>>>>>> > >>>>>>>>>> (3) Why do we not have a `delete(key)` method? I am ok with not > >>>>>>>>> supporting all methods from existing KV-store, but a > `delete(key)` > >>>>>> seems > >>>>>>>>> to be fundamentally to have? > >>>>>>>>> > >>>>>>>>> What do you think the semantics of `delete(key)` should be for > >>>>>> versioned > >>>>>>>>> stores? Should `delete(key)` delete (and return) all record > >> versions > >>>>>> for > >>>>>>>>> the key? Or should we have `delete(key, timestamp)` which is > >>>>>> equivalent > >>>>>>> to > >>>>>>>>> `put(key, null, timestamp)` except with a return type to return > >>>>>>>>> ValueAndTimestamp representing the record it replaced? > >>>>>>>>> > >>>>>>>>> If we have ready alignment on what the interface and semantics > for > >>>>>>>>> `delete(key)` should be, then adding it in this KIP sounds good. > I > >>>>>> just > >>>>>>>>> didn't want the rest of the KIP to be hung up over additional > >>>>>>> interfaces, > >>>>>>>>> given that we can always add extra interfaces in the future. > >>>>>>>>> > >>>>>>>>>> (4a) Do we need `get(key)`? It seems to be the same as `get(key, > >>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? > Just > >>>> for > >>>>>>>>> my own clarification (should we add something to the JavaDocs?). > >>>>>>>>> > >>>>>>>>> Correct, it is just syntactic sugar. I will add a clarification > >> into > >>>>>> the > >>>>>>>>> Javadocs as you've suggested. > >>>>>>>>> > >>>>>>>>>> (4b) Should we throw an exception if a user queries out-of-bound > >>>>>>>>> instead of returning `null` (in `get(key,ts)`)? > >>>>>>>>> -> You put it into "rejected alternatives", and I > understand > >>>> your > >>>>>>>>> argument. Would love to get input from others about this question > >>>>>>>>> though. -- It seems we also return `null` for windowed stores, so > >>>>>> maybe > >>>>>>>>> the strongest argument is to align to existing behavior? Or do we > >>>> have > >>>>>>>>> case for which the current behavior is problematic? > >>>>>>>>> > >>>>>>>>> Sure; curious to hear what others think as well. > >>>>>>>>> > >>>>>>>>>> (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation > >>>>>>>>> discretion when this is the case)" -> Should we make it a > stricter > >>>>>>>>> contract such that the user can reason about it better (there is > >> WIP > >>>>>> to > >>>>>>>>> make retention time a strict bound for windowed stores atm) > >>>>>>>>> -> JavaDocs on `persistentVersionedKeyValueStore` seems to > >>>>>> suggest a > >>>>>>>>> strict bound, too. > >>>>>>>>> > >>>>>>>>> Ah, great question. I think the question boils down to: do we > want > >> to > >>>>>>>>> require that all versioned stores (including custom user > >>>>>>> implementations) > >>>>>>>>> use "history retention" to determine when to expire old record > >>>>>> versions? > >>>>>>>>> > >>>>>>>>> Because the `persistentVersionedKeyValueStore(...)` method > returns > >>>>>>>>> instances of the provided RocksDB-based versioned store > >>>>>> implementation, > >>>>>>>>> which does use history retention for this purpose, that's why we > >> can > >>>>>>> very > >>>>>>>>> clearly say that for this store, `get(key, ts)` will return null > if > >>>>>> the > >>>>>>>>> provided timestamp bound has fallen out of history retention. The > >>>>>>> reason I > >>>>>>>>> left the `VersionedKeyValueStore#get(key, ts)` Javadoc more > generic > >>>>>>> (i.e., > >>>>>>>>> does not mention history retention) is because maybe a user > >>>>>> implementing > >>>>>>>>> their own custom store will choose a different expiry mechanism, > >>>> e.g., > >>>>>>> keep > >>>>>>>>> the three latest versions for each key regardless of how old the > >>>>>>> timestamps > >>>>>>>>> are. > >>>>>>>>> > >>>>>>>>> If we want to require that all versioned stores use history > >> retention > >>>>>> in > >>>>>>>>> order to determine when to expire old records, then I will > >> certainly > >>>>>>> update > >>>>>>>>> the Javadoc to clarify. This is already a requirement for DSL > users > >>>>>>> because > >>>>>>>>> the VersionedBytesStoreSupplier interface requires history > >> retention > >>>>>> to > >>>>>>> be > >>>>>>>>> provided (in order for changelog topic configs to be properly > set), > >>>> so > >>>>>>> it's > >>>>>>>>> just a question of whether we also want to require PAPI users to > >> use > >>>>>>>>> history retention too. I had a look at the existing window stores > >> and > >>>>>>>>> didn't see precedent for requiring all window stores have a > >> standard > >>>>>>>>> "retention time" concept for how long to keep windows, but if we > >> want > >>>>>> to > >>>>>>>>> have a standard "history retention" concept for versioned stores > we > >>>>>>>>> certainly can. WDYT? > >>>>>>>>> > >>>>>>>>>> (5a) Do we need to expose `segmentInterval`? For > windowed-stores, > >> we > >>>>>>>>> also use segments but hard-code it to two (it was exposed in > >> earlier > >>>>>>>>> versions but it seems not useful, even if we would be open to > >> expose > >>>>>> it > >>>>>>>>> again if there is user demand). > >>>>>>>>> > >>>>>>>>> If we want to leave it out of this first KIP (and potentially > >> expose > >>>>>> it > >>>>>>> in > >>>>>>>>> the future), that works for me. The performance benchmarks I ran > >>>>>> suggest > >>>>>>>>> that this parameter greatly impacts store performance though and > is > >>>>>> very > >>>>>>>>> workload dependent. If a user reported poor performance using > >>>>>> versioned > >>>>>>>>> stores for their workload, this is the first parameter I would > want > >>>> to > >>>>>>>>> tune. That said, metrics/observability for versioned stores > (which > >>>>>>> would be > >>>>>>>>> helpful for determining how this parameter should be adjusted) > have > >>>>>> been > >>>>>>>>> deferred to a follow-up KIP, so perhaps that's reason to defer > >>>>>> exposing > >>>>>>>>> this parameter as well. > >>>>>>>>> > >>>>>>>>>> (5b) JavaDocs says: "Performance degrades as more record > versions > >>>> for > >>>>>>>>> the same key are collected in a single segment. On the other > hand, > >>>>>>>>> out-of-order writes and reads which access older segments may > slow > >>>>>> down > >>>>>>>>> if there are too many segments." -- Wondering if JavaDocs should > >> make > >>>>>>>>> any statements about expected performance? Seems to be an > >>>>>> implementation > >>>>>>>>> detail? > >>>>>>>>> > >>>>>>>>> I included this sentence to explain why a user might want to tune > >>>> this > >>>>>>>>> value / help guide how to think about the parameter, but if we > want > >>>> to > >>>>>>>>> remove it entirely (per the discussion point above) then this > >> Javadoc > >>>>>>> will > >>>>>>>>> be removed with it. > >>>>>>>>> > >>>>>>>>>> (6) validTo timestamp is "exclusive", right? Ie, if I query > >>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next" record > >> v2 > >>>>>>>>> with validFromV2=ts? > >>>>>>>>> > >>>>>>>>> I actually intended for it to be inclusive (will update the KIP). > >> Do > >>>>>> you > >>>>>>>>> think exclusive is more intuitive? The reason I had inclusive in > my > >>>>>>> mind is > >>>>>>>>> because it's like a "AS OF <time>" query, which treats the time > >> bound > >>>>>> as > >>>>>>>>> inclusive. > >>>>>>>>> > >>>>>>>>>> (7) The KIP says, that segments are stores in the same RocksDB > -- > >>>> for > >>>>>>>>> this case, how are efficient deletes handled? For windowed-store, > >> we > >>>>>> can > >>>>>>>>> just delete a full RocksDB. > >>>>>>>>> > >>>>>>>>> The way that multiple segments are represented in the same > RocksDB > >> is > >>>>>>> that > >>>>>>>>> the RocksDB keys are prefixed with segment ID. An entire segment > is > >>>>>>> deleted > >>>>>>>>> with a single `deleteRange()` call to RocksDB. > >>>>>>>>> > >>>>>>>>>> (8) Rejected alternatives: you propose to not return the validTo > >>>>>>>>> timestamp -- if we find it useful in the future to return it, > would > >>>>>>>>> there be a clean path to change it accordingly? > >>>>>>>>> > >>>>>>>>> With the current proposal, there's no clean path. If we think > >> there's > >>>>>> a > >>>>>>>>> good chance we might want to do this in the future, then we > should > >>>>>>> update > >>>>>>>>> the proposed interfaces. > >>>>>>>>> > >>>>>>>>> The current proposed return type from `VersionedKeyValueStore<K, > >>>>>>>>> V>#get(key, tsTo)` is `ValueAndTimestamp<V>`. There's no way to > >> add a > >>>>>>>>> second timestamp into `ValueAndTimestamp<V>`, which is why > there's > >> no > >>>>>>> clean > >>>>>>>>> path to include validTo timestamp in the future under the > existing > >>>>>>>>> proposal. > >>>>>>>>> > >>>>>>>>> If we wanted to allow for including validTo timestamp in the > >> future, > >>>>>>> we'd > >>>>>>>>> instead update the return type to be a new `VersionedRecord<V>` > >>>>>> object. > >>>>>>>>> Today a `VersionedRecord<V>` could just include `value` and > >>>>>> `timestamp`, > >>>>>>>>> and in the future we could add `validTo` (names subject to > change) > >>>>>> into > >>>>>>> the > >>>>>>>>> `VersionedRecord` as well. (It'd look a little strange for now > >> since > >>>>>>>>> VersionedRecord is the same as ValueAndTimestamp, but that seems > >>>>>> fine.) > >>>>>>>>> > >>>>>>>>> If we choose to do this, I think we should also update the return > >>>> type > >>>>>>> of > >>>>>>>>> `VersionedKeyValueStore#get(key)` to be VersionedRecord as well, > >>>>>> rather > >>>>>>>>> than having one return TimestampAndValue while the other returns > >>>>>>>>> VersionedRecord. > >>>>>>>>> > >>>>>>>>> ----------- Sagar's comments ----------- > >>>>>>>>> > >>>>>>>>>> 1) Did you consider adding a method similar to : > >>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)? > >>>>>>>>> I think this could be useful considering that this > >>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT? > >>>>>>>>> > >>>>>>>>> Yes, I do think this method is valuable. I think we will > definitely > >>>>>>> want to > >>>>>>>>> support time-range based queries at some point (hopefully soon), > >> and > >>>>>>> likely > >>>>>>>>> also key-range based queries (to achieve feature parity with > >> existing > >>>>>>>>> key-value stores). > >>>>>>>>> > >>>>>>>>> It's not immediately clear to me whether these types of queries > >>>> should > >>>>>>> be > >>>>>>>>> supported as part of the store interface or if they should only > be > >>>>>>>>> supported via the `query(...)` method for IQv2. (It's an open > >>>> question > >>>>>>> as > >>>>>>>>> to whether we should support IQv1 for versioned stores or only > >> IQv2. > >>>> A > >>>>>>>>> benefit of IQv2 over IQv1 is that we won't need to add individual > >>>>>> store > >>>>>>>>> methods for each type of query, including for all wrapped store > >>>>>> layers.) > >>>>>>>>> > >>>>>>>>> If we have clear non-IQ use cases for these methods (e.g., use > >> cases > >>>>>>> within > >>>>>>>>> processors), then they'll need to be added as part of the store > >>>>>>> interface > >>>>>>>>> for sure. I'm leaning towards adding them as part of the store > >>>>>> interface > >>>>>>>>> but given the ambiguity here, it may be preferrable to defer to a > >>>>>>> follow-up > >>>>>>>>> KIP. OTOH, if you think the versioned store interface as proposed > >> in > >>>>>>> this > >>>>>>>>> KIP is too bare bones to be useful, I'm open to adding it in now > as > >>>>>>> well. > >>>>>>>>> > >>>>>>>>>> 2) I have a similar question as Matthias, about the timestampTo > >>>>>>> argument > >>>>>>>>> when doing a get. Is it inclusive or exclusive? > >>>>>>>>> > >>>>>>>>> Same answer (and follow-up question) as above. Do you think it > will > >>>> be > >>>>>>>>> confusing for `get(key, tsTo)` to use an inclusive time bound, > >> while > >>>>>>>>> `get(key, tsFrom, tsTo)` would use an exclusive tsTo time bound? > >>>> Maybe > >>>>>>> we > >>>>>>>>> should rename `get(key, tsFrom, tsTo)` to `getVersions(...)` or > >>>>>>>>> `getRange(...)` in order to avoid confusion. > >>>>>>>>> > >>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is essentially > >> the > >>>>>>>>> timestamp at which the record was inserted. validFrom makes it > >> sound > >>>>>>> like > >>>>>>>>> validTo which can keep changing based on new records while *from* > >> is > >>>>>>> fixed. > >>>>>>>>> WDYT? > >>>>>>>>> > >>>>>>>>> "It is essentially the timestamp at which the record was > inserted" > >>>> <-- > >>>>>>> Yes, > >>>>>>>>> that's correct. > >>>>>>>>> > >>>>>>>>> I borrowed the "validFrom/validTo" terminology from temporal > >> tables, > >>>>>>> e.g., > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver16 > >>>>>>>>> . > >>>>>>>>> I don't believe the terms "validFrom" or "validTo" are currently > >>>>>> exposed > >>>>>>>>> anywhere in any of the user-facing interfaces (or Javadocs); I > just > >>>>>>> needed > >>>>>>>>> a way to refer to the concepts in the KIP. Hopefully this is a > >>>>>> non-issue > >>>>>>>>> (at least for now) as a result. Do you have a suggestion for > >>>>>> terminology > >>>>>>>>> that would've been less confusing? > >>>>>>>>> > >>>>>>>>>> 4) Even I think delete api should be supported. > >>>>>>>>> > >>>>>>>>> Makes sense. It'd be to get your input on the same follow-up > >>>>>> questions I > >>>>>>>>> asked Matthias above as well :) > >>>>>>>>> > >>>>>>>>> On Tue, Nov 22, 2022 at 4:25 AM Sagar <sagarmeansoc...@gmail.com > > > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Victoria, > >>>>>>>>>> > >>>>>>>>>> Thanks for the KIP. Seems like a very interesting idea! > >>>>>>>>>> > >>>>>>>>>> I have a couple of questions: > >>>>>>>>>> > >>>>>>>>>> 1) Did you consider adding a method similar to : > >>>>>>>>>> List<ValueAndTimeStamp<V>> get(K key, long from, long to)? > >>>>>>>>>> > >>>>>>>>>> I think this could be useful considering that this > >>>>>>>>>> versioning scheme unlocks time travel at a key basis. WDYT? > >>>>>>>>>> > >>>>>>>>>> 2) I have a similar question as Matthias, about the timestampTo > >>>>>>> argument > >>>>>>>>>> when doing a get. Is it inclusive or exclusive? > >>>>>>>>>> > >>>>>>>>>> 3) validFrom sounds slightly confusing to me. It is essentially > >> the > >>>>>>>>>> timestamp at which the record was inserted. validFrom makes it > >> sound > >>>>>>> like > >>>>>>>>>> validTo which can keep changing based on new records while > *from* > >> is > >>>>>>>>> fixed. > >>>>>>>>>> WDYT? > >>>>>>>>>> > >>>>>>>>>> 4) Even I think delete api should be supported. > >>>>>>>>>> > >>>>>>>>>> Thanks! > >>>>>>>>>> Sagar. > >>>>>>>>>> > >>>>>>>>>> On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax < > mj...@apache.org > >>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks for the KIP Victoria. Very well written! > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Couple of questions (many might just require to add some more > >>>>>> details > >>>>>>>>> to > >>>>>>>>>>> the KIP): > >>>>>>>>>>> > >>>>>>>>>>> (1) Why does the new store not extend KeyValueStore, but > >>>>>>> StateStore? > >>>>>>>>>>> In the end, it's a KeyValueStore? > >>>>>>>>>>> > >>>>>>>>>>> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even > >> if we > >>>>>>> don't > >>>>>>>>>>> want to support IQ in this KIP, it might be good to add this > >>>>>> interface > >>>>>>>>>>> right away to avoid complications for follow up KIPs? Or won't > >>>> there > >>>>>>> by > >>>>>>>>>>> any complications anyway? > >>>>>>>>>>> > >>>>>>>>>>> (3) Why do we not have a `delete(key)` method? I am ok > with > >> not > >>>>>>>>>>> supporting all methods from existing KV-store, but a > >> `delete(key)` > >>>>>>>>> seems > >>>>>>>>>>> to be fundamentally to have? > >>>>>>>>>>> > >>>>>>>>>>> (4a) Do we need `get(key)`? It seems to be the same as > >>>> `get(key, > >>>>>>>>>>> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? > >> Just > >>>>>> for > >>>>>>>>>>> my own clarification (should we add something to the > JavaDocs?). > >>>>>>>>>>> > >>>>>>>>>>> (4b) Should we throw an exception if a user queries > >>>> out-of-bound > >>>>>>>>>>> instead of returning `null` (in `get(key,ts)`)? > >>>>>>>>>>> -> You put it into "rejected alternatives", and I > >> understand > >>>>>> your > >>>>>>>>>>> argument. Would love to get input from others about this > question > >>>>>>>>>>> though. -- It seems we also return `null` for windowed stores, > so > >>>>>>> maybe > >>>>>>>>>>> the strongest argument is to align to existing behavior? Or do > we > >>>>>> have > >>>>>>>>>>> case for which the current behavior is problematic? > >>>>>>>>>>> > >>>>>>>>>>> (4c) JavaDoc on `get(key,ts)` says: "(up to store > >>>> implementation > >>>>>>>>>>> discretion when this is the case)" -> Should we make it a > >> stricter > >>>>>>>>>>> contract such that the user can reason about it better (there > is > >>>> WIP > >>>>>>> to > >>>>>>>>>>> make retention time a strict bound for windowed stores atm) > >>>>>>>>>>> -> JavaDocs on `persistentVersionedKeyValueStore` seems > to > >>>>>>> suggest a > >>>>>>>>>>> strict bound, too. > >>>>>>>>>>> > >>>>>>>>>>> (5a) Do we need to expose `segmentInterval`? For > >>>> windowed-stores, > >>>>>>> we > >>>>>>>>>>> also use segments but hard-code it to two (it was exposed in > >>>> earlier > >>>>>>>>>>> versions but it seems not useful, even if we would be open to > >>>> expose > >>>>>>> it > >>>>>>>>>>> again if there is user demand). > >>>>>>>>>>> > >>>>>>>>>>> (5b) JavaDocs says: "Performance degrades as more record > >>>> versions > >>>>>>> for > >>>>>>>>>>> the same key are collected in a single segment. On the other > >> hand, > >>>>>>>>>>> out-of-order writes and reads which access older segments may > >> slow > >>>>>>> down > >>>>>>>>>>> if there are too many segments." -- Wondering if JavaDocs > should > >>>>>> make > >>>>>>>>>>> any statements about expected performance? Seems to be an > >>>>>>>>> implementation > >>>>>>>>>>> detail? > >>>>>>>>>>> > >>>>>>>>>>> (6) validTo timestamp is "exclusive", right? Ie, if I > query > >>>>>>>>>>> `get(key,ts[=validToV1])` I would get `null` or the "next" > record > >>>> v2 > >>>>>>>>>>> with validFromV2=ts? > >>>>>>>>>>> > >>>>>>>>>>> (7) The KIP says, that segments are stores in the same > >> RocksDB > >>>> -- > >>>>>>> for > >>>>>>>>>>> this case, how are efficient deletes handled? For > windowed-store, > >>>> we > >>>>>>>>> can > >>>>>>>>>>> just delete a full RocksDB. > >>>>>>>>>>> > >>>>>>>>>>> (8) Rejected alternatives: you propose to not return the > >>>> validTo > >>>>>>>>>>> timestamp -- if we find it useful in the future to return it, > >> would > >>>>>>>>>>> there be a clean path to change it accordingly? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 11/16/22 9:57 PM, Victoria Xia wrote: > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> I have a proposal for introducing versioned state stores in > >> Kafka > >>>>>>>>>>> Streams. > >>>>>>>>>>>> Versioned state stores are similar to key-value stores except > >> they > >>>>>>>>> can > >>>>>>>>>>>> store multiple record versions for a single key. This KIP > >> focuses > >>>>>> on > >>>>>>>>>>>> interfaces only in order to limit the scope of the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Victoria > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >