It might be easier if you make a github gist with your code. It is quite difficult to see what is happening in an email.
Cheers, Damian On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ctip...@gmail.com> wrote: > Thanks a lot Damien. > I am able to get to see if the join worked (using foreach). I tried to add > the logic to query the store after starting the streams: > Looks like the code is not getting there. Here is the modified code: > > KafkaStreams streams = new KafkaStreams(builder, props); > > streams.start(); > > > parser.foreach(new ForeachAction<String, JsonNode>() { > @Override > public void apply(String key, JsonNode value) { > System.out.println(key + ": " + value); > if (value == null){ > System.out.println("null match"); > ReadOnlyKeyValueStore<String, Long> keyValueStore = > null; > try { > keyValueStore = > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", > QueryableStoreTypes.keyValueStore(), streams); > } catch (InterruptedException e) { > e.printStackTrace(); > } > > KeyValueIterator kviterator = > keyValueStore.range("test_nod","test_node"); > } > } > }); > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <damian....@gmail.com> wrote: > > > Hi, > > The store won't be queryable until after you have called streams.start(). > > No stores have been created until the application is up and running and > > they are dependent on the underlying partitions. > > > > To check that a stateful operation has produced a result you would > normally > > add another operation after the join, i.e., > > stream.join(other,...).foreach(..) or stream.join(other,...).to("topic") > > > > Thanks, > > Damian > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com> wrote: > > > > > One more thing.. How do we check if the stateful join operation > resulted > > in > > > a kstream of some value in it (size of kstream)? How do we check the > > > content of a kstream? > > > > > > - S > > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <ctip...@gmail.com> > > wrote: > > > > > > > Damien, > > > > > > > > Thanks a lot for pointing out. > > > > > > > > I got a little further. I am kind of stuck with the sequencing. > Couple > > of > > > > issues: > > > > 1. I cannot initialise KafkaStreams before the parser.to(). > > > > 2. Do I need to create a new KafkaStreams object when I create a > > > > KeyValueStore? > > > > 3. How do I initialize KeyValueIterator with <String, JsonNode> I > seem > > to > > > > get a error when I try: > > > > *KeyValueIterator <String,JsonNode> kviterator > > > > = keyValueStore.range("test_nod","test_node");* > > > > > > > > /////// START CODE ///////// > > > > //parser is a kstream as a result of join > > > > if (parser.toString().matches("null")){ > > > > > > > > ReadOnlyKeyValueStore<String, Long> keyValueStore = > > > > null; > > > > KafkaStreams newstreams = new KafkaStreams(builder, props); > > > > try { > > > > keyValueStore = > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", > > > > QueryableStoreTypes.keyValueStore(), newstreams); > > > > } catch (InterruptedException e) { > > > > e.printStackTrace(); > > > > } > > > > * KeyValueIterator kviterator > > > > = keyValueStore.range("test_nod","test_node");* > > > > }else { > > > > > > > > * parser.to <http://parser.to>(stringSerde, jsonSerde, > "parser");*} > > > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);* > > > > streams.start(); > > > > > > > > /////// END CODE ///////// > > > > > > > > - S > > > > > > > > > > > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > > > > It is part of the ReadOnlyKeyValueStore interface: > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > > > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java > > > > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <ctip...@gmail.com> > > wrote: > > > > > > > > > > > That's cool. This feature is a part of rocksdb object and not > > ktable? > > > > > > > > > > > > Sent from my iPhone > > > > > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > > > > > > > > Yes they can be strings, > > > > > > > > > > > > > > so you could do something like: > > > > > > > store.range("test_host", "test_hosu"); > > > > > > > > > > > > > > This would return an iterator containing all of the values > > > > (inclusive) > > > > > > from > > > > > > > "test_host" -> "test_hosu". > > > > > > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <ctip...@gmail.com > > > > > > wrote: > > > > > > >> > > > > > > >> Can you please point me to an example? Can from and to be a > > > string? > > > > > > >> > > > > > > >> Sent from my iPhone > > > > > > >> > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <damian....@gmail.com> > > > > wrote: > > > > > > >>> > > > > > > >>> Hi, > > > > > > >>> > > > > > > >>> You can't use a regex, but you could use a range query. > > > > > > >>> i.e, keyValueStore.range(from, to) > > > > > > >>> > > > > > > >>> Thanks, > > > > > > >>> Damian > > > > > > >>> > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur < > ctip...@gmail.com > > > > > > > wrote: > > > > > > >>>> > > > > > > >>>> Hello, > > > > > > >>>> > > > > > > >>>> I am able to get the kstream to ktable join work. I have > some > > > use > > > > > > cases > > > > > > >>>> where the key is not always a exact match. > > > > > > >>>> I was wondering if there is a way to lookup keys based on > > regex. > > > > > > >>>> > > > > > > >>>> For example, > > > > > > >>>> I have these entries for a ktable: > > > > > > >>>> test_host1,{ "source": "test_host", "UL1": "test1_l1" } > > > > > > >>>> > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" } > > > > > > >>>> > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" } > > > > > > >>>> > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" } > > > > > > >>>> > > > > > > >>>> and this for a kstream: > > > > > > >>>> > > > > > > >>>> test_host,{ "source": "test_host", "custom": { "test ": { > > > > > > >> "creation_time ": > > > > > > >>>> "1234 " } } } > > > > > > >>>> > > > > > > >>>> In this case, if the exact match does not work, I would like > > to > > > > lookup > > > > > > >>>> ktable for all entries that contains "test_host*" in it and > > have > > > > > > >>>> application logic to determine what would be the best fit. > > > > > > >>>> > > > > > > >>>> Appreciate input. > > > > > > >>>> > > > > > > >>>> - Shekar > > > > > > >>>> > > > > > > >> > > > > > > > > > > > > > > > >