Hi,
The store won't be queryable until after you have called streams.start().
No stores have been created until the application is up and running and
they are dependent on the underlying partitions.

To check that a stateful operation has produced a result you would normally
add another operation after the join, i.e.,
stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")

Thanks,
Damian

On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com> wrote:

> One more thing.. How do we check if the stateful join operation resulted in
> a kstream of some value in it (size of kstream)? How do we check the
> content of a kstream?
>
> - S
>
> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Damien,
> >
> > Thanks a lot for pointing out.
> >
> > I got a little further. I am kind of stuck with the sequencing. Couple of
> > issues:
> > 1. I cannot initialise KafkaStreams before the parser.to().
> > 2. Do I need to create a new KafkaStreams object when I create a
> > KeyValueStore?
> > 3. How do I initialize KeyValueIterator with <String, JsonNode> I seem to
> > get a error when I try:
> > *KeyValueIterator <String,JsonNode> kviterator
> > = keyValueStore.range("test_nod","test_node");*
> >
> > /////// START CODE /////////
> > //parser is a kstream as a result of join
> > if (parser.toString().matches("null")){
> >
> >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >             null;
> >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> >     try {
> >         keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), newstreams);
> >     } catch (InterruptedException e) {
> >         e.printStackTrace();
> >     }
> > *    KeyValueIterator kviterator
> > = keyValueStore.range("test_nod","test_node");*
> > }else {
> >
> > *    parser.to <http://parser.to>(stringSerde, jsonSerde, "parser");*}
> >
> > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > streams.start();
> >
> > /////// END CODE /////////
> >
> > - S
> >
> >
> >
> > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <damian....@gmail.com>
> wrote:
> > >
> > > It is part of the ReadOnlyKeyValueStore interface:
> > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > >
> > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ctip...@gmail.com> wrote:
> > >
> > > > That's cool. This feature is a part of rocksdb object and not ktable?
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On Jul 27, 2017, at 07:57, Damian Guy <damian....@gmail.com>
> wrote:
> > > > >
> > > > > Yes they can be strings,
> > > > >
> > > > > so you could do something like:
> > > > > store.range("test_host", "test_hosu");
> > > > >
> > > > > This would return an iterator containing all of the values
> > (inclusive)
> > > > from
> > > > > "test_host" -> "test_hosu".
> > > > >
> > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > > > >>
> > > > >> Can you please point me to an example? Can from and to be a
> string?
> > > > >>
> > > > >> Sent from my iPhone
> > > > >>
> > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <damian....@gmail.com>
> > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> You can't use a regex, but you could use a range query.
> > > > >>> i.e, keyValueStore.range(from, to)
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Damian
> > > > >>>
> > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > > > >>>>
> > > > >>>> Hello,
> > > > >>>>
> > > > >>>> I am able to get the kstream to ktable join work. I have some
> use
> > > > cases
> > > > >>>> where the key is not always a exact match.
> > > > >>>> I was wondering if there is a way to lookup keys based on regex.
> > > > >>>>
> > > > >>>> For example,
> > > > >>>> I have these entries for a ktable:
> > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > >>>>
> > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > >>>>
> > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > >>>>
> > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > >>>>
> > > > >>>> and this for a kstream:
> > > > >>>>
> > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > > >> "creation_time ":
> > > > >>>> "1234 " } } }
> > > > >>>>
> > > > >>>> In this case, if the exact match does not work, I would like to
> > lookup
> > > > >>>> ktable for all entries that contains "test_host*" in it and have
> > > > >>>> application logic to determine what would be the best fit.
> > > > >>>>
> > > > >>>> Appreciate input.
> > > > >>>>
> > > > >>>> - Shekar
> > > > >>>>
> > > > >>
> > > >
> >
>

Reply via email to