Hi all,

Thanks for starting this, Matthias! I've had multiple people mention this
feature request to me. Actually, the most recent such request was from
someone developing an LMDB-backed set of store implementations, as
a drop-in replacement for RocksDB, so Sophie's suggestion seems
relevant.

What do you think, instead of defining a StoreType enum, of defining
an interface like:
vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
public interface StoreImplementation {
    KeyValueBytesStoreSupplier keyValueSupplier(
        String name
    );

    WindowBytesStoreSupplier windowBytesStoreSupplier(
        String name,
        Duration retentionPeriod,
        Duration windowSize,
        boolean retainDuplicates
    );

    SessionBytesStoreSupplier sessionBytesStoreSupplier(
        String name,
        Duration retentionPeriod
    );
}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Then the default.dsl.store.type you proposed would take a class name instead,
with the requirement that the class given must implement StoreImplementation,
and it must also have a zero-arg constructor so we can reflectively instantiate 
it.

The interface above is compatible with the existing "store supplier" "interface"
we have loosely defined in Stores. For veracity's sake, here's how we could
implement it:

vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
public class RocksDBStoreImplementation implements StoreImplementation {

    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.persistentTimestampedKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                                                             final Duration 
retentionPeriod,
                                                             final Duration 
windowSize,
                                                             final boolean 
retainDuplicates) {
        return Stores.persistentTimestampedWindowStore(name, retentionPeriod, 
windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String 
name, final Duration retentionPeriod) {
        return Stores.persistentSessionStore(name, retentionPeriod);
    }
}


public class InMemoryStoreImplementation implements StoreImplementation {

    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.inMemoryKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                                                             final Duration 
retentionPeriod,
                                                             final Duration 
windowSize,
                                                             final boolean 
retainDuplicates) {
        return Stores.inMemoryWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String 
name, final Duration retentionPeriod) {
        return Stores.inMemorySessionStore(name, retentionPeriod);
    }
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

I liked your suggestion to add a new Materialized overload, as long as the
generics work out. I think we'd have to actually experiment with it to make
sure (might be nice to do this in a POC PR, rather than having to amend the
KIP later, but it's your call).

In fact, I have also gotten a lot of feedback that our 
StoreBuilder/StoreSupplier/
Stores/Materialized/etc. all amount to a pretty confusing ball of code for 
users.
It seems like this suggestion is a good opportunity to clear out a lot of the
confusion, by deprecating all the StoreSupplier methods in Stores, as well
as the other StoreSupplier methods on Materialized, and just converging on
passing around the StoreImplementation.

It seems like this general strategy actually nets a few benefits beyond just
being able to swap in a different "default" store implementation:
* It relieves users from having to specify the kind of store 
(KV/Window/Session) whenever the really just wanted to specify the 
implementation. Offhand, I don't think there's any situation in which you can 
"choose" which kind of store to use, it's always dictated by the topology, so 
it's purely a paper cut opportunity as-is.
* It allows Streams to select the kind of store that it actually needs. E.g., 
it opens up a future opportunity for us to correctly choose to use a Windowed 
store everywhere downstream of a windowing operation.

Thanks for proposing this KIP! I think it'll be a great addition to Streams.
-John

On Mon, Apr 13, 2020, at 22:56, Sophie Blee-Goldman wrote:
> Hey Matthias,
> 
> Thanks for picking this up! This'll be really nice for testing in
> particular.
> 
> My only question is, do we want to make this available for use with custom
> state stores as well? I'm not sure how common custom stores are in practice,
> but I imagine when they *are* used, they're likely to be used all
> throughout the
> topology. So being able to set this one config would probably be a big win.
> 
> That said, it would be a nontrivial change given the different store types.
> It's unfortunate that we can't just accept a StoreSupplier class to
> configure this;
> we'd need one for KV, window, and session stores each. We could just
> add three configs, but that's not very appealing when it should take one.
> 
> Maybe we could define a new "store supplier"-supplier type class, which
> maps the store supplier for each of the three store types? Just throwing out
> ideas.
> 
> I'm actually fine with passing on the custom state stores for this feature
> if
> it doesn't sound worth the effort -- just wanted to put the thought out
> there,
> and see if anyone comes up with a more elegant solution.
> 
> Thanks for the KIP!
> Sophie
> 
> On Thu, Apr 9, 2020 at 3:50 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
> > Hi,
> >
> > I would like to propose a small KIP to simplify the switch from RocksDB
> > to in-memory stores in Kafka Stream:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type
> >
> > Looking forward to your feedback.
> >
> >
> > -Matthias
> >
> >
>

Reply via email to