While writing the KIP, the idea about custom stores also came to my
mind, but I thought it could be done in a follow up KIP. However, given
that two people asked about it, it might be better to just do it right
away. Using a "super supplier interface" instead of an String config
seems to be the more natural choice and we would not introduce a config
what we might later deprecate again.

Doing a POC sounds good to me. I'll start working on it and let you know
when I have something ready. I am not worried to much about generics,
but John's suggestion to get rid of the API that accepts StoreSuppliers
is larger scope and it's unclear to me atm how it will unfold.


-Matthias

On 4/14/20 11:24 AM, John Roesler wrote:
> Hi all,
> 
> Thanks for starting this, Matthias! I've had multiple people mention this
> feature request to me. Actually, the most recent such request was from
> someone developing an LMDB-backed set of store implementations, as
> a drop-in replacement for RocksDB, so Sophie's suggestion seems
> relevant.
> 
> What do you think, instead of defining a StoreType enum, of defining
> an interface like:
> vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> public interface StoreImplementation {
>     KeyValueBytesStoreSupplier keyValueSupplier(
>         String name
>     );
> 
>     WindowBytesStoreSupplier windowBytesStoreSupplier(
>         String name,
>         Duration retentionPeriod,
>         Duration windowSize,
>         boolean retainDuplicates
>     );
> 
>     SessionBytesStoreSupplier sessionBytesStoreSupplier(
>         String name,
>         Duration retentionPeriod
>     );
> }
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 
> Then the default.dsl.store.type you proposed would take a class name instead,
> with the requirement that the class given must implement StoreImplementation,
> and it must also have a zero-arg constructor so we can reflectively 
> instantiate it.
> 
> The interface above is compatible with the existing "store supplier" 
> "interface"
> we have loosely defined in Stores. For veracity's sake, here's how we could
> implement it:
> 
> vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
> public class RocksDBStoreImplementation implements StoreImplementation {
> 
>     @Override
>     public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
>         return Stores.persistentTimestampedKeyValueStore(name);
>     }
> 
>     @Override
>     public WindowBytesStoreSupplier windowBytesStoreSupplier(final String 
> name,
>                                                              final Duration 
> retentionPeriod,
>                                                              final Duration 
> windowSize,
>                                                              final boolean 
> retainDuplicates) {
>         return Stores.persistentTimestampedWindowStore(name, retentionPeriod, 
> windowSize, retainDuplicates);
>     }
> 
>     @Override
>     public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String 
> name, final Duration retentionPeriod) {
>         return Stores.persistentSessionStore(name, retentionPeriod);
>     }
> }
> 
> 
> public class InMemoryStoreImplementation implements StoreImplementation {
> 
>     @Override
>     public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
>         return Stores.inMemoryKeyValueStore(name);
>     }
> 
>     @Override
>     public WindowBytesStoreSupplier windowBytesStoreSupplier(final String 
> name,
>                                                              final Duration 
> retentionPeriod,
>                                                              final Duration 
> windowSize,
>                                                              final boolean 
> retainDuplicates) {
>         return Stores.inMemoryWindowStore(name, retentionPeriod, windowSize, 
> retainDuplicates);
>     }
> 
>     @Override
>     public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String 
> name, final Duration retentionPeriod) {
>         return Stores.inMemorySessionStore(name, retentionPeriod);
>     }
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 
> I liked your suggestion to add a new Materialized overload, as long as the
> generics work out. I think we'd have to actually experiment with it to make
> sure (might be nice to do this in a POC PR, rather than having to amend the
> KIP later, but it's your call).
> 
> In fact, I have also gotten a lot of feedback that our 
> StoreBuilder/StoreSupplier/
> Stores/Materialized/etc. all amount to a pretty confusing ball of code for 
> users.
> It seems like this suggestion is a good opportunity to clear out a lot of the
> confusion, by deprecating all the StoreSupplier methods in Stores, as well
> as the other StoreSupplier methods on Materialized, and just converging on
> passing around the StoreImplementation.
> 
> It seems like this general strategy actually nets a few benefits beyond just
> being able to swap in a different "default" store implementation:
> * It relieves users from having to specify the kind of store 
> (KV/Window/Session) whenever the really just wanted to specify the 
> implementation. Offhand, I don't think there's any situation in which you can 
> "choose" which kind of store to use, it's always dictated by the topology, so 
> it's purely a paper cut opportunity as-is.
> * It allows Streams to select the kind of store that it actually needs. E.g., 
> it opens up a future opportunity for us to correctly choose to use a Windowed 
> store everywhere downstream of a windowing operation.
> 
> Thanks for proposing this KIP! I think it'll be a great addition to Streams.
> -John
> 
> On Mon, Apr 13, 2020, at 22:56, Sophie Blee-Goldman wrote:
>> Hey Matthias,
>>
>> Thanks for picking this up! This'll be really nice for testing in
>> particular.
>>
>> My only question is, do we want to make this available for use with custom
>> state stores as well? I'm not sure how common custom stores are in practice,
>> but I imagine when they *are* used, they're likely to be used all
>> throughout the
>> topology. So being able to set this one config would probably be a big win.
>>
>> That said, it would be a nontrivial change given the different store types.
>> It's unfortunate that we can't just accept a StoreSupplier class to
>> configure this;
>> we'd need one for KV, window, and session stores each. We could just
>> add three configs, but that's not very appealing when it should take one.
>>
>> Maybe we could define a new "store supplier"-supplier type class, which
>> maps the store supplier for each of the three store types? Just throwing out
>> ideas.
>>
>> I'm actually fine with passing on the custom state stores for this feature
>> if
>> it doesn't sound worth the effort -- just wanted to put the thought out
>> there,
>> and see if anyone comes up with a more elegant solution.
>>
>> Thanks for the KIP!
>> Sophie
>>
>> On Thu, Apr 9, 2020 at 3:50 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> I would like to propose a small KIP to simplify the switch from RocksDB
>>> to in-memory stores in Kafka Stream:
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
>>>
>>> Looking forward to your feedback.
>>>
>>>
>>> -Matthias
>>>
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to