Thanks for writing this up Matthias. I'm also thinking that maybe we can allow customized store types to be replaced as a single config knob.
I'm even thinking if this can be extended to processor API besides DSL as well: e.g. if we allow classes like "KeyValueStoreBuilder" to take a constructor like KeyValueStoreBuilder(StoreImplementation) Then the config would then be used for StreamsBuilder#build, while for processor API user can pass in impl for `Topology#addStateStore`; of course if users want they can pass in different impl classes which we cannot forbid, but at least processor API users can still partially benefit from this (also maybe we can rename the config to "default.store.impl.class"?). Guozhang On Tue, Apr 14, 2020 at 5:44 PM Matthias J. Sax <mj...@apache.org> wrote: > While writing the KIP, the idea about custom stores also came to my > mind, but I thought it could be done in a follow up KIP. However, given > that two people asked about it, it might be better to just do it right > away. Using a "super supplier interface" instead of an String config > seems to be the more natural choice and we would not introduce a config > what we might later deprecate again. > > Doing a POC sounds good to me. I'll start working on it and let you know > when I have something ready. I am not worried to much about generics, > but John's suggestion to get rid of the API that accepts StoreSuppliers > is larger scope and it's unclear to me atm how it will unfold. > > > -Matthias > > On 4/14/20 11:24 AM, John Roesler wrote: > > Hi all, > > > > Thanks for starting this, Matthias! I've had multiple people mention this > > feature request to me. Actually, the most recent such request was from > > someone developing an LMDB-backed set of store implementations, as > > a drop-in replacement for RocksDB, so Sophie's suggestion seems > > relevant. > > > > What do you think, instead of defining a StoreType enum, of defining > > an interface like: > > vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv > > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > > public interface StoreImplementation { > > KeyValueBytesStoreSupplier keyValueSupplier( > > String name > > ); > > > > WindowBytesStoreSupplier windowBytesStoreSupplier( > > String name, > > Duration retentionPeriod, > > Duration windowSize, > > boolean retainDuplicates > > ); > > > > SessionBytesStoreSupplier sessionBytesStoreSupplier( > > String name, > > Duration retentionPeriod > > ); > > } > > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > > > > Then the default.dsl.store.type you proposed would take a class name > instead, > > with the requirement that the class given must implement > StoreImplementation, > > and it must also have a zero-arg constructor so we can reflectively > instantiate it. > > > > The interface above is compatible with the existing "store supplier" > "interface" > > we have loosely defined in Stores. For veracity's sake, here's how we > could > > implement it: > > > > vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv > > public class RocksDBStoreImplementation implements StoreImplementation { > > > > @Override > > public KeyValueBytesStoreSupplier keyValueSupplier(final String > name) { > > return Stores.persistentTimestampedKeyValueStore(name); > > } > > > > @Override > > public WindowBytesStoreSupplier windowBytesStoreSupplier(final > String name, > > final > Duration retentionPeriod, > > final > Duration windowSize, > > final > boolean retainDuplicates) { > > return Stores.persistentTimestampedWindowStore(name, > retentionPeriod, windowSize, retainDuplicates); > > } > > > > @Override > > public SessionBytesStoreSupplier sessionBytesStoreSupplier(final > String name, final Duration retentionPeriod) { > > return Stores.persistentSessionStore(name, retentionPeriod); > > } > > } > > > > > > public class InMemoryStoreImplementation implements StoreImplementation { > > > > @Override > > public KeyValueBytesStoreSupplier keyValueSupplier(final String > name) { > > return Stores.inMemoryKeyValueStore(name); > > } > > > > @Override > > public WindowBytesStoreSupplier windowBytesStoreSupplier(final > String name, > > final > Duration retentionPeriod, > > final > Duration windowSize, > > final > boolean retainDuplicates) { > > return Stores.inMemoryWindowStore(name, retentionPeriod, > windowSize, retainDuplicates); > > } > > > > @Override > > public SessionBytesStoreSupplier sessionBytesStoreSupplier(final > String name, final Duration retentionPeriod) { > > return Stores.inMemorySessionStore(name, retentionPeriod); > > } > > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > > > > I liked your suggestion to add a new Materialized overload, as long as > the > > generics work out. I think we'd have to actually experiment with it to > make > > sure (might be nice to do this in a POC PR, rather than having to amend > the > > KIP later, but it's your call). > > > > In fact, I have also gotten a lot of feedback that our > StoreBuilder/StoreSupplier/ > > Stores/Materialized/etc. all amount to a pretty confusing ball of code > for users. > > It seems like this suggestion is a good opportunity to clear out a lot > of the > > confusion, by deprecating all the StoreSupplier methods in Stores, as > well > > as the other StoreSupplier methods on Materialized, and just converging > on > > passing around the StoreImplementation. > > > > It seems like this general strategy actually nets a few benefits beyond > just > > being able to swap in a different "default" store implementation: > > * It relieves users from having to specify the kind of store > (KV/Window/Session) whenever the really just wanted to specify the > implementation. Offhand, I don't think there's any situation in which you > can "choose" which kind of store to use, it's always dictated by the > topology, so it's purely a paper cut opportunity as-is. > > * It allows Streams to select the kind of store that it actually needs. > E.g., it opens up a future opportunity for us to correctly choose to use a > Windowed store everywhere downstream of a windowing operation. > > > > Thanks for proposing this KIP! I think it'll be a great addition to > Streams. > > -John > > > > On Mon, Apr 13, 2020, at 22:56, Sophie Blee-Goldman wrote: > >> Hey Matthias, > >> > >> Thanks for picking this up! This'll be really nice for testing in > >> particular. > >> > >> My only question is, do we want to make this available for use with > custom > >> state stores as well? I'm not sure how common custom stores are in > practice, > >> but I imagine when they *are* used, they're likely to be used all > >> throughout the > >> topology. So being able to set this one config would probably be a big > win. > >> > >> That said, it would be a nontrivial change given the different store > types. > >> It's unfortunate that we can't just accept a StoreSupplier class to > >> configure this; > >> we'd need one for KV, window, and session stores each. We could just > >> add three configs, but that's not very appealing when it should take > one. > >> > >> Maybe we could define a new "store supplier"-supplier type class, which > >> maps the store supplier for each of the three store types? Just > throwing out > >> ideas. > >> > >> I'm actually fine with passing on the custom state stores for this > feature > >> if > >> it doesn't sound worth the effort -- just wanted to put the thought out > >> there, > >> and see if anyone comes up with a more elegant solution. > >> > >> Thanks for the KIP! > >> Sophie > >> > >> On Thu, Apr 9, 2020 at 3:50 PM Matthias J. Sax <mj...@apache.org> > wrote: > >> > >>> Hi, > >>> > >>> I would like to propose a small KIP to simplify the switch from RocksDB > >>> to in-memory stores in Kafka Stream: > >>> > >>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type > >>> > >>> Looking forward to your feedback. > >>> > >>> > >>> -Matthias > >>> > >>> > >> > > -- -- Guozhang