Hi, Shekar, Did you take a look at the stats to see:
1) Is there any incoming messages? 2) Is there any messages in the changelog topic? Could you also try to change the log4j level to DEBUG to see whether we can see something in the log? Thanks! -Yi On Mon, Jul 6, 2015 at 4:43 PM, Shekar Tippur <ctip...@gmail.com> wrote: > Martin, > > As seen below, I have only 1 partition. What else could be wrong? > > *$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser > > Topic:parser PartitionCount:1 ReplicationFactor:1 Configs: > > Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0 > > - Shekar > > > > > On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com> > wrote: > > > Hi Shekar, > > > > The store.all() iterator ought to give you the entire contents of the > > store. However, note that each partition of the input topic results in a > > separate StreamTask instance, which in turn has a separate store. So > there > > will be as many stores as there are input partitions. Perhaps you're not > > seeing data appear because you're writing it in one partition, and trying > > to read it in another. > > > > The partitioning also answers what happens when you run your job on a > > cluster. Different partitions may be processed on different nodes, but > all > > the messages in one input topic partition always go to the same > StreamTask > > (and thus the same store). Thus, whether you have skew or not depends > > entirely on how you partition your input topic. > > > > Regarding atomic deletion: each StreamTask is single-threaded, so you > > don't have to worry about concurrency. If you want to delete all keys in > > the store, you can do so. > > > > Martin > > > > On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote: > > > > > Any answer on how to get all the kv values and reinitialise the kv > store? > > > > > > Had one more question on implementing sliding window. > > > > > > If i use a kv store like rocksdb, and I use yarn (say 3 node cluster), > > the > > > job that it runs to aggregate gets distributed as well and I am > guessing > > > the aggregation numbers get skewed? Is that a right assessment? > > > > > > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com> > wrote: > > > > > >> Also, next.getValue() or next.getKey() does not yield anything. > > >> > > >> KeyValueIterator<String, String> i = store.all(); > > >> > > >> while(i.hasNext()){ > > >> > > >> Entry <String, String> next = i.next(); > > >> > > >> log.info("Removed Key", next.getValue()); > > >> > > >> } > > >> > > >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com> > > wrote: > > >> > > >>> Yi, > > >>> > > >>> There is no exception. I want to do couple of things in the window. > > >>> > > >>> - Get all the keys and values and publish to another store (like > > >>> graphite) as a list > > >>> - Remove all entries. > > >>> > > >>> I can iterate thro the list later but I want to be able to get all kv > > >>> values and delete all of them in an atomic operation. > > >>> > > >>> How do I do these operations on the kv store? > > >>> > > >>> - S > > >>> > > >>> > > >>> > > >>> > > >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com> wrote: > > >>> > > >>>> Hi, Shekar, > > >>>> > > >>>> Sorry I was not able to follow up w/ you in time. It is great that > you > > >>>> have > > >>>> found the configure problem and made it work! > > >>>> > > >>>> As for the exception on the iterator, could you send us the log w/ > the > > >>>> exception? > > >>>> > > >>>> Thanks! > > >>>> > > >>>> -Yi > > >>>> > > >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com> > > wrote: > > >>>> > > >>>>> Yi, > > >>>>> > > >>>>> Looks like it is working now. There was a redundant line in the > > config. > > >>>>> > > >>>>> I am able to initialize kv store and add values. > > >>>>> In the window code, I am unable to retrieve them and mark them as > 0. > > >>>>> > > >>>>> Here is my window code: > > >>>>> > > >>>>> public void window(MessageCollector collector, > > >>>>> > > >>>>> TaskCoordinator coordinator) { > > >>>>> > > >>>>> //store.delete(appName); > > >>>>> > > >>>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > >>>> eventsSeen)); > > >>>>> > > >>>>> KeyValueIterator<String, String> i = store.all(); > > >>>>> > > >>>>> while(i.hasNext()){ > > >>>>> > > >>>>> Entry <String, String> next = i.next(); > > >>>>> > > >>>>> log.info("Trying to remove Key", next.getKey()); > > >>>>> > > >>>>> //i.remove(); > > >>>>> > > >>>>> > > >>>>> > > >>>>> } > > >>>>> > > >>>>> eventsSeen = 0; > > >>>>> > > >>>>> i.close(); > > >>>>> > > >>>>> > > >>>>> > > >>>>> } > > >>>>> > > >>>>> > > >>>>> How do I retrieve the key and is there a way to remove it? i.remove > > >>>> throws > > >>>>> an exception. > > >>>>> > > >>>>> > > >>>>> - Shekar > > >>>>> > > >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctip...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>>> Yi, > > >>>>>> > > >>>>>> Here is my config file: > > >>>>>> http://pastebin.com/Kf3C9E0h > > >>>>>> > > >>>>>> - S > > >>>>>> > > >>>>> > > >>>> > > >>> > > >>> > > >> > > > > >