It might be easier if you make a github gist with your code. It is quite
difficult to see what is happening in an email.

Cheers,
Damian
On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ctip...@gmail.com> wrote:

> Thanks a lot Damien.
> I am able to get to see if the join worked (using foreach). I tried to add
> the logic to query the store after starting the streams:
> Looks like the code is not getting there. Here is the modified code:
>
> KafkaStreams streams = new KafkaStreams(builder, props);
>
> streams.start();
>
>
> parser.foreach(new ForeachAction<String, JsonNode>() {
>     @Override
>     public void apply(String key, JsonNode value) {
>         System.out.println(key + ": " + value);
>         if (value == null){
>             System.out.println("null match");
>             ReadOnlyKeyValueStore<String, Long> keyValueStore =
>                     null;
>             try {
>                 keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> QueryableStoreTypes.keyValueStore(), streams);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>
>             KeyValueIterator  kviterator =
> keyValueStore.range("test_nod","test_node");
>         }
>     }
> });
>
>
> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi,
> > The store won't be queryable until after you have called streams.start().
> > No stores have been created until the application is up and running and
> > they are dependent on the underlying partitions.
> >
> > To check that a stateful operation has produced a result you would
> normally
> > add another operation after the join, i.e.,
> > stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
> >
> > Thanks,
> > Damian
> >
> > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com> wrote:
> >
> > > One more thing.. How do we check if the stateful join operation
> resulted
> > in
> > > a kstream of some value in it (size of kstream)? How do we check the
> > > content of a kstream?
> > >
> > > - S
> > >
> > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > >
> > > > Damien,
> > > >
> > > > Thanks a lot for pointing out.
> > > >
> > > > I got a little further. I am kind of stuck with the sequencing.
> Couple
> > of
> > > > issues:
> > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > KeyValueStore?
> > > > 3. How do I initialize KeyValueIterator with <String, JsonNode> I
> seem
> > to
> > > > get a error when I try:
> > > > *KeyValueIterator <String,JsonNode> kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > >
> > > > /////// START CODE /////////
> > > > //parser is a kstream as a result of join
> > > > if (parser.toString().matches("null")){
> > > >
> > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> > > >             null;
> > > >     KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > >     try {
> > > >         keyValueStore =
> > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > >     } catch (InterruptedException e) {
> > > >         e.printStackTrace();
> > > >     }
> > > > *    KeyValueIterator kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > > }else {
> > > >
> > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> "parser");*}
> > > >
> > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > streams.start();
> > > >
> > > > /////// END CODE /////////
> > > >
> > > > - S
> > > >
> > > >
> > > >
> > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <damian....@gmail.com>
> > > wrote:
> > > > >
> > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > > >
> > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > > > >
> > > > > > That's cool. This feature is a part of rocksdb object and not
> > ktable?
> > > > > >
> > > > > > Sent from my iPhone
> > > > > >
> > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <damian....@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Yes they can be strings,
> > > > > > >
> > > > > > > so you could do something like:
> > > > > > > store.range("test_host", "test_hosu");
> > > > > > >
> > > > > > > This would return an iterator containing all of the values
> > > > (inclusive)
> > > > > > from
> > > > > > > "test_host" -> "test_hosu".
> > > > > > >
> > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ctip...@gmail.com
> >
> > > > wrote:
> > > > > > >>
> > > > > > >> Can you please point me to an example? Can from and to be a
> > > string?
> > > > > > >>
> > > > > > >> Sent from my iPhone
> > > > > > >>
> > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <damian....@gmail.com>
> > > > wrote:
> > > > > > >>>
> > > > > > >>> Hi,
> > > > > > >>>
> > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > > >>>
> > > > > > >>> Thanks,
> > > > > > >>> Damian
> > > > > > >>>
> > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> ctip...@gmail.com
> > >
> > > > wrote:
> > > > > > >>>>
> > > > > > >>>> Hello,
> > > > > > >>>>
> > > > > > >>>> I am able to get the kstream to ktable join work. I have
> some
> > > use
> > > > > > cases
> > > > > > >>>> where the key is not always a exact match.
> > > > > > >>>> I was wondering if there is a way to lookup keys based on
> > regex.
> > > > > > >>>>
> > > > > > >>>> For example,
> > > > > > >>>> I have these entries for a ktable:
> > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > > > >>>>
> > > > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > > > >>>>
> > > > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > > > >>>>
> > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > > > >>>>
> > > > > > >>>> and this for a kstream:
> > > > > > >>>>
> > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": {
> > > > > > >> "creation_time ":
> > > > > > >>>> "1234 " } } }
> > > > > > >>>>
> > > > > > >>>> In this case, if the exact match does not work, I would like
> > to
> > > > lookup
> > > > > > >>>> ktable for all entries that contains "test_host*" in it and
> > have
> > > > > > >>>> application logic to determine what would be the best fit.
> > > > > > >>>>
> > > > > > >>>> Appreciate input.
> > > > > > >>>>
> > > > > > >>>> - Shekar
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > >
> > >
> >
>

Reply via email to