Hi John, Thank you. I think it makes sense to modify the KIP to add the prefixScan() as part of the existing interfaces and add the new mixin behaviour as Rejected alternatives. I am not very aware of other stores apart from keyValueStore so is it fine if I keep it there for now?
Regarding the type definition of types I will try and think about some alternatives and share if I get any. Thanks! Sagar. On Sun, May 31, 2020 at 1:55 AM John Roesler <vvcep...@apache.org> wrote: > Hi Sagar, > > Thanks for the response. Your use case makes sense to me; I figured it > must be something like that. > > On a pragmatic level, in the near term, you might consider basically doing > the same thing we did in KIP-213. If you swap out the store types for > Byte/byte[] and “manually” invoke the serdes in your own logic, then you > can use the same algorithm we did to derive the range scan boundaries from > your desired prefix. > > For the actual KIP, it seems like we would need significant design > improvements to be able to do any mixins, so I think we should favor > proposing either to just add to the existing interfaces or to create brand > new interfaces, as appropriate, for now. Given that prefix can be converted > to a range query at a low level, I think we can probably explore adding > prefix to the existing interfaces with a default implementation. > > It seems like that just leaves the question of how to define the type of > the prefix. To be honest, I don’t have any great ideas here. Are you able > to generate some creative solutions, Sagar? > > Thanks, > John > > On Tue, May 26, 2020, at 06:42, Sagar wrote: > > Hi John, > > > > Thanks for the detailed reply. I was a bit crammed with work last week so > > couldn't respond earlier so apologies for that. > > > > First of all, thanks for the context that both you and Adam have > > provided me on the issues faced previously. As I can clearly see, while I > > was able to cut some corners while writing some test cases or benchmarks, > > to be able to stitch together a store with prefix scan into an actual > > topology needs more work. I am sorry for the half baked tests that I > wrote > > without realising and you have rightly put it when you said these > > challenges aren't obvious up front. > > > > Now, coming back to the other points, I spent some time going through the > > KIP-213 and also some of the code snippets that are talked about in that > > KIP. With the detailed explanation that you provided, it is now obvious > to > > me that keeping a generic type for keys like K won't work oob and hence a > > decision was made to use Bytes as the key type. > > > > I just had another thought on this though. I looked at the range function > > that was added in the ReadOnlyKeyValueStore. While the Key and the Value > > mentioned in that method is generic, internally almost all queries end up > > querying using Bytes in some or the other form. I looked at not just > > RocksDb Store but other stores like InMemory store or MemoryLRU and this > > seems to be the pattern. I think this stems from the fact that these > stores > > while implementing KeyValueStore pass Bytes, byte[] as the K and V > values. > > Classes like MeteredKeyValueStore which don't do this, still use > Bytes.wrap > > to wrap the passed keys and values and invoke the range method. > > > > So, the point I am trying to make is, with the same behaviour - and > > ignoring for a moment that it's a separate interface which I am trying to > > "mix-in"- the issues with the key types could be resolved. I may be wrong > > though so would like to know your thoughts on this. Infact unknowingly > the > > interface implementation of PrefixSeekableType in RockDBStateStore also > > passes Bytes and bytes[] as K and V. > > > > The second part of exposing it via the publically accessible interfaces > to > > which we downcast while building the topology (like KeyValueStore), I can > > clearly see now that mixing-in the way I tried to won't work. My > intention > > all along was not to hamper the flow of those stores which don't support > > prefix scan as yet and hence the separate interface. But, I agree that > for > > this to work, it needs to be part of some pre-defined store types like > > KVStore etc. Right now, I don't have an answer to this but mostly it > would > > have to be moved there and implemented across all stores(if we see the > > worth in prefix scans :) ) > > > > Regarding the motivation, I am sorry if I wasn't clear. This originated > > from one of my own use cases with kafka streams where i needed to find > some > > keys based upon certain prefix. Infact it's similar to the > > RangeScanCombinedKeyUsage diagram in KIP-213 where the otherTable tries > to > > find entries in the state store based upon the FK. I was using > > KevValueStore to be precise. I also remember having a slack conversation > on > > this, and I was told that this isn't supported right now, but some other > > users shared their experiences on how with some hacks they are able to > > perform prefix scans even though their use case fits the bill for a > prefix > > scan. That kind of motivated me to take a stab at it. Unfortunately, I > have > > lost the slack chat because of some cleanup at the slack channel level. I > > will try and update the ambiguous motivation statement in the near > future. > > > > Lastly, I would like to point out, that your response was not at all > > discouraging. On the contrary it was really insightful and it's always > good > > to learn/discover new things :) > > > > Thanks! > > Sagar. > > > > On Fri, May 15, 2020 at 7:37 AM John Roesler <vvcep...@apache.org> > wrote: > > > > > Hi, Sagar! > > > > > > Thanks for this KIP. I'm sorry it took me so long to reply. I'll > number my > > > points differently to avoid confusion. > > > > > > I can provide some additional context on the difficulties we previously > > > faced in KIP-213 (which you and Adam have already discussed). > > > > > > J1) In your KIP, you propose the following interface: > > > > > > public interface PrefixSeekableStore<K, V> { > > > KeyValueIterator<K, V> prefixSeek(K prefix); > > > } > > > > > > This is roughly the same thing that Adam and I were considering > > > before. It has a hidden problem, that it assumes that prefixes of > > > keys in the key space are also in the key space. In other words, this > > > is a store with key type K, and the API assumes that prefixes are also > > > of type K. This is true for some key types, like String or Bytes, but > not > > > for others. > > > > > > For example, if the keys are UUIDs, then no prefix is also a UUID. If > the > > > key is a complex data type, like Windowed<K> in our own DSL, then > > > we would absolutely want to query all keys with the same record key > > > (the K part), or the same window start time, but in neither case is the > > > prefix actually a Windowed<K>. > > > > > > You can skirt the issue by defining a third type parameter, maybe KP, > that > > > is the "prefix" type, but this would also be awkward for many usages. > > > > > > J2) There is a related problem with serialization. Whether something > > > is a prefix or not depends not on the Java key (K), but on the binary > > > format that is produced when you use a serde on the key. Whether > > > we say that the prefix must also be a K or whether it gets its own > type, > > > KP, there are problems. > > > > > > In the latter case, we must additionally require a second set of serdes > > > for the prefixes, but there's no obvious way to incorporate this in the > > > API, especially not in the DSL. > > > > > > In either case, for the API to actually work, we need to know ahead > > > of time that the Serde will produce a binary key that starts with the > > > part that we wish to use as a prefix. For example, what we were doing > > > briefly in KIP-213 (where we had complex keys, similar to Windowed<K>) > > > was to define "dummy" values that indicate that a Windowed<K> is > actually > > > just a prefix key, not a real key. Maybe the window start time would be > > > null or the key part would be null. But we also had to define a serde > > > that would very specifically anticipate which component of the complex > > > key would need to be used in a prefix key. Having to bring all these > > > parts together in a reliable, easy-to-debug, fashion gives me some > doubt > > > that people would actually be able to use this feature in complicated > > > programs without driving themselves crazy. > > > > > > J3) Thanks so much for including benchmarks and tests! Unfortunately, > > > these don't include everything you need to really plug into the Streams > > > API. I think when you push it a little farther, you'll realize what > Adam > > > was talking about wrt the interface difficulties. > > > > > > In your benchmark and tests, you directly construct the store and then > > > use it, but in a real Streams application, you can only provide your > > > implementation in a StoreSupplier, for example via the Materialized > > > parameter. Then, to use the store from inside a Processor, you'd have > > > to get it by name from the ProcessorContext, and then cast it to one of > > > the pre-defined store types, KeyValueStore, WindowedStore, or > > > SessionStore. It won't work to "mix in" your interface because the > > > processor gets a store that's wrapped in layers that handle > serialization, > > > change-logging, recording metrics, and caching. > > > > > > To use the store through IQ, you have to provide a QueriableStoreType > > > to KafkaStreams#store, and you get back a similarly wrapped store. > > > > > > I think our only choices to add an interface like yours is either to > add > > > it to one of the existing store types, like KeyValueStore or > > > WindowedStore, or to define a completely new store hierarchy, meaning > > > you have to duplicate all the "wrapper" layers in Streams. > > > > > > I think if you write an "end-to-end" test, where you write a Streams > app, > > > provide your store, and then use it in a Processor and through IQ, > > > you'll see what I'm talking about. > > > > > > IIRC, those three points were the ones that ultimately led us to > abandon > > > the whole idea last time and just register the stores with key type > Bytes. > > > I think some creative solutions may yet be possible, but it'll take > some > > > more design work to get there. > > > > > > Can I ask what your motivation is, exactly, for proposing this feature? > > > The motivation just says "some users may want to do it", which has > > > the advantage that it's impossible to disagree with, but doesn't > provide > > > a lot of concrete detail ;) > > > > > > Specifically, what I'm wondering is whether you wanted to use this as > > > part of a KayValue store, which might be a challenge, or whether you > > > wanted to use it for more efficient scans in a WindowedStore, like > > > Guozhang. > > > > > > Thanks again for the KIP! I hope my response isn't too discouraging; > > > I just wanted to convey the challenges we faced last time, since they > > > are all not obvious up front. > > > > > > Best regards, > > > -John > > > > > > > > > On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote: > > > > Whoops, I guess I didn't finish reading the KIP all the way to the > end > > > > earlier. Thanks > > > > for including the link to the RocksDB PR in the KIP! > > > > > > > > I have one additional question about the proposal: do you plan to > also > > > add > > > > this > > > > prefix seek API to the dual column family iterators? These are used > by > > > > RocksDBTimestampedStore (which extends RocksDBStore), for example the > > > > *RocksDBDualCFRangeIterator* > > > > > > > > Thanks for the KIP! > > > > > > > > On Thu, May 14, 2020 at 10:50 AM Sagar <sagarmeansoc...@gmail.com> > > > wrote: > > > > > > > > > Hey @Adam, > > > > > > > > > > Thanks for sharing your experience with using prefix seek. I did > look > > > at > > > > > your code for RocksDBPrefixIterator, infact I have repurposed that > > > class > > > > > itself since it wasn't being used else where. Regarding how I plan > to > > > > > expose them through-out the state stores, what I have tried to do > is > > > add it > > > > > as a separate interface. So, basically, it is not at the same > level as > > > the > > > > > *range function so to speak. The reason I did that is currently I > feel > > > not > > > > > all state stores are a natural fit for prefix seek. As I mentioned > in > > > the > > > > > KIP as well, the current equivalent to it could be > > > BulkLoadingStore(not in > > > > > terms of functionality but in terms of how it is also not > implemented > > > by > > > > > all of them). So, that ways I am not needing to stub them across > all > > > the > > > > > state-stores and we can implement it only where needed. For > example, > > > in the > > > > > PR that I have put for reference in the KIP, you can see that I > have it > > > > > implemented only for RocksDB. > > > > > > > > > > @Guozhang, > > > > > > > > > > Thanks for the feedback. Those are very interesting questions and I > > > will > > > > > try my best to answer based upon whatever limited understanding I > have > > > > > developed so far :) > > > > > > > > > > 1) Regarding the usage of useFixedLengthPrefixExtractor, honestly, > I > > > hadn't > > > > > looked at that config. I did look it up after you pointed it out > and > > > seems > > > > > it's more for hash-based memtables? I may be wrong though. But > what I > > > would > > > > > say is that, the changes I had made were not exactly from a > correctness > > > > > stand point but more from trying to showcase how we can implement > these > > > > > changes. The idea was that once we see the merit in this approach > then > > > we > > > > > can add some of the tunings( and I would need your team's > assistance > > > there > > > > > :D). > > > > > > > > > > 2) Regarding the similarity of `RocksDBPrefixIterator` and > > > > > `RocksDBRangeIterator`, yes the implementations look more or less > > > similar. > > > > > So, in terms of performance, they might be similar. But > semantically, > > > they > > > > > can solve 2 different use-cases. The range seek is useful when we > know > > > both > > > > > from and to. But if we consider use-cases where we want to find > keys > > > with a > > > > > certain prefix, but we don't know if what it's start and end is, > then > > > > > prefix seek would come in more handy. The point that I am trying to > > > make is > > > > > that it can extend the scope of state stores from just point > lookups to > > > > > somewhat being able to speculative queries where by users can > search > > > if a > > > > > certain pattern exists. I can vouch for this personally because I > > > wanted to > > > > > use state stores for one such use case and since this option wasn't > > > there, > > > > > I had to do some other things. An equivalent to this could be SCAN > > > operator > > > > > in Redis. (Not trying to compare the Redis and state stores but > trying > > > to > > > > > give some context). > > > > > > > > > > Regarding the point on bloom filter, I think there are certain > > > > > optimisations that are being talked about in case of prefix seek > here: > > > > > > > > > > > > > > > > > > > https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key > > > > > Again > > > > > this isn't something that I have explored fully. Also, on the > prefix > > > seek > > > > > page on RocksDB they mention that there's a prefix iterating > technique > > > > > called Prefix Bloom Filter. > > > > > > > > > > 3) Regarding the question on length of bytes for seek v/s prefix > seek, > > > I am > > > > > not entirely sure about that scenario. What I have understood is > that > > > > > at-least for Rocks DB, it is more performant for short iterator > queries > > > > > that longer ones. > > > > > > > > > > 4) Regarding the last question on placing it within Segment, the > > > reason I > > > > > didn't do that way, is that I thought we shouldn't tie this feature > > > only to > > > > > RocksDB. I agree that I got this idea while looking/reading about > > > RocksDB > > > > > but if we keep it outside the purview of RocksDB and keep it as a > > > pluggable > > > > > entity, then a) it remains generic by not being tied to any > specific > > > store > > > > > and b) no change is needed at all for any of the other stores which > > > haven't > > > > > implemented it. > > > > > > > > > > I am not sure of any of the above points make sense but as I said, > > > this is > > > > > based out of my limited understanding of the codebase. So, pardon > any > > > > > incorrect/illogical statements plz! > > > > > > > > > > @Sophie, > > > > > > > > > > Thanks for bringing that point up! I have mentioned about that PR > in > > > the > > > > > KIP under a section called Other considerations. Nonetheless, > thanks > > > for > > > > > pointing it out! > > > > > > > > > > Thanks! > > > > > Sagar. > > > > > > > > > > > > > > > On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman < > > > sop...@confluent.io> > > > > > wrote: > > > > > > > > > > > Not to derail this KIP discussion, but to leave a few notes on > some > > > of > > > > > the > > > > > > RocksDB points that have come up: > > > > > > > > > > > > Someone actually merged some long overdue performance > improvements to > > > > > > the RocksJava implementation (the PR was opened back in 2017! > yikes). > > > > > > I haven't looked into the prefix seek API closely enough to know > how > > > > > > relevant > > > > > > this particular change is, and they are still improving things, > but > > > it > > > > > > gives me some > > > > > > faith. > > > > > > > > > > > > There are some pretty promising results reported on the PR: > > > > > > > https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037 > > > > > > > > > > > > Regarding the custom comparator, they also recently merged this > > > > > performance > > > > > > <https://github.com/facebook/rocksdb/pull/6252> > > > > > > improvement <https://github.com/facebook/rocksdb/pull/6252>. The > > > tl;dr > > > > > is > > > > > > they reduced the slowdown of a custom comparator in Java > > > > > > (relative to the native C++) from ~7x to ~5.2x at best. Which is > > > still > > > > > not > > > > > > great, but it > > > > > > would be interesting to run our own benchmarks and see how this > > > stacks > > > > > up. > > > > > > > > > > > > Of course, these are all new changes and as such will require us > to > > > > > upgrade > > > > > > rocks to 6.x which means they have to wait for us to release a > 3.0. > > > But > > > > > > there's > > > > > > some talk about 3.0 coming in the next few releases so consider > it > > > food > > > > > for > > > > > > not-so-future thought > > > > > > > > > > > > > > > > > > On Tue, May 12, 2020 at 5:02 PM Adam Bellemare < > > > adam.bellem...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Guozhang > > > > > > > > > > > > > > For clarity, the issues I was running into was not about the > actual > > > > > > > *prefixSeek* function itself, but about exposing it to the same > > > level > > > > > of > > > > > > > access as the *range* function throughout Kafka Streams. It > > > required a > > > > > > lot > > > > > > > of changes, and also required that most state stores stub it > out > > > since > > > > > it > > > > > > > wasn't clear how they would implement it. It was basically an > > > > > > overreaching > > > > > > > API change that was easily solved (for the specific > prefix-scan in > > > FKJ) > > > > > > by > > > > > > > simply using *range*. So to be clear, the blockers were > > > predominantly > > > > > > > around correctly handling the API changes, nothing to do with > the > > > > > > > mechanisms of the RocksDB prefix scanning. > > > > > > > > > > > > > > As for KAFKA-5285 I'll look into it more to see if I can get a > > > better > > > > > > > handle on the problem! > > > > > > > > > > > > > > Hope this helps clear it up. > > > > > > > > > > > > > > Adam > > > > > > > > > > > > > > > > > > > > > On Tue, May 12, 2020 at 7:16 PM Guozhang Wang < > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hello Adam, > > > > > > > > > > > > > > > > I'm wondering if you can provide a bit more context on the > > > blockers > > > > > of > > > > > > > > using prefixSeek of RocksDB (I saw you have a > > > RocksDBPrefixIterator > > > > > > class > > > > > > > > but not used anywhere yet)? I'm currently looking at ways to > > > allow > > > > > some > > > > > > > > secondary indices with rocksDB following some existing > approaches > > > > > > > > from CockroachDB etc so I'm very curious to learn your > > > experience. > > > > > > > > > > > > > > > > 1) Before considering any secondary indices, a quick thought > is > > > that > > > > > > for > > > > > > > > (key, timeFrom, timeTo) queries, we can easily replace the > > > current > > > > > > > > `range()` impl with a `prefixRange()` impl via a prefix > iterator; > > > > > > though > > > > > > > > for (keyFrom, keyTo, timeFrom, timeTo) it is much more > > > complicated > > > > > > indeed > > > > > > > > and hence existing `range()` impl may still be used. > > > > > > > > > > > > > > > > 2) Another related issue I've been pondering for a while is > > > > > > > > around KAFKA-5285: with the default lexicograpic byte > comparator, > > > > > since > > > > > > > the > > > > > > > > key length varies, the combo (key, window) would have > > > interleaving > > > > > byte > > > > > > > > layouts like: > > > > > > > > > > > > > > > > AAA0001 (key AAA, timestamp 0001) > > > > > > > > AAA00011 (key AAA0, timestamp 0011) > > > > > > > > AAA0002 (key AAA, timestamp 0002) > > > > > > > > > > > > > > > > which is challenging for prefix seeks to work efficiently. > > > Although > > > > > we > > > > > > > can > > > > > > > > overwrite the byte-comparator in JNI it is very expensive > and the > > > > > cost > > > > > > of > > > > > > > > JNI overwhelms its benefits. If you've got some ideas around > it > > > > > please > > > > > > > lmk > > > > > > > > as well. > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare < > > > > > > adam.bellem...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Sagar > > > > > > > > > > > > > > > > > > I implemented a very similar interface for KIP-213, the > > > foreign-key > > > > > > > > joiner. > > > > > > > > > We pulled it out of the final implementation and instead > used > > > > > RocksDB > > > > > > > > range > > > > > > > > > instead. You can see the particular code where we use > > > > > > > RocksDB.range(...) > > > > > > > > to > > > > > > > > > get the same iterator result. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95 > > > > > > > > > > > > > > > > > > We pulled it out because there were numerous awkward > > > acrobatics to > > > > > > > > > integrate *prefixSeek()* function into the Kafka Streams > code. > > > > > > > > Basically, I > > > > > > > > > wanted to be able to access *prefixSeek()* the same way I > can > > > > > access > > > > > > > > > *range()* for any state store, and in particular use it for > > > storing > > > > > > > data > > > > > > > > > with a particular foreign key (as per the previous URL). > > > However, I > > > > > > > found > > > > > > > > > out that it required way too many changes to expose the > > > > > > *prefixSeek()* > > > > > > > > > functionality while still being able to leverage all the > nice > > > Kafka > > > > > > > > Streams > > > > > > > > > state management + supplier functionality, so we made a > > > decision > > > > > just > > > > > > > to > > > > > > > > > stick with *range()* and pull everything else out. > > > > > > > > > > > > > > > > > > I guess my question here is, how do you anticipate using > > > > > > *prefixSeek()* > > > > > > > > > within the framework of Kafka Streams, or the Processor > API? > > > > > > > > > > > > > > > > > > Adam > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 12, 2020 at 2:52 AM Sagar < > > > sagarmeansoc...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > > > I would like to start a discussion on the KIP that I > created > > > > > below > > > > > > to > > > > > > > > add > > > > > > > > > > prefix scan support in State Stores: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > Sagar. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >