Hi,
I have created a simple window store to count occurrences of a given key.


My pipeline is:

        TimeWindows windows = TimeWindows.of(n).advanceBy(n).until(30n);
        final StateStoreSupplier<WindowStore> supplier =
Stores.create("key-table")
                .withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .enableLogging(topicConfigMap)
                .windowed(windows.size(), windows.maintainMs(),
windows.segments, false)
                .build();

        builder.stream(Serdes.String(), valueSerde, "input-topic")
        .groupByKey()
        .count(windows, supplier)


Now as per docs to query the store I would have to use:

String storeName = supplier.name();
ReadOnlyWindowStore<String,Long> localWindowStore =
streams.store(storeName, QueryableStoreTypes.<String,
Long>windowStore());
String key = "some-key";
long fromTime = ...;
long toTime = ...;
WindowStoreIterator<Long> countForWordsForWindows =
localWindowStore.fetch(key, timeFrom, timeTo);


My questions are:

1. Can I run a different java application to query the state store
created by first application.

If yes then how can I refer to the state store?


2. Value in the state store against any given key will keep
incrementing as and when we read new data from the topic for a given
time period.

So at time t say count against k1 is 5 for a given window

If we query that time we get 5, but at time t1 for same key and window
count increases to 10.

If we query that time we get 10.

Question is how do we make sure that we query the state store only
after it has aggregated all the values for a given window?

And is there a way for that java application to run forever (just like
streams application)to keep querying state store and report back the
values.


Thanks

Sachin

Reply via email to