Hi Shekar,

The store.all() iterator ought to give you the entire contents of the store. 
However, note that each partition of the input topic results in a separate 
StreamTask instance, which in turn has a separate store. So there will be as 
many stores as there are input partitions. Perhaps you're not seeing data 
appear because you're writing it in one partition, and trying to read it in 
another.

The partitioning also answers what happens when you run your job on a cluster. 
Different partitions may be processed on different nodes, but all the messages 
in one input topic partition always go to the same StreamTask (and thus the 
same store). Thus, whether you have skew or not depends entirely on how you 
partition your input topic.

Regarding atomic deletion: each StreamTask is single-threaded, so you don't 
have to worry about concurrency. If you want to delete all keys in the store, 
you can do so.

Martin

On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote:

> Any answer on how to get all the kv values and reinitialise the kv store?
> 
> Had one more question on implementing sliding window.
> 
> If i use a kv store like rocksdb, and I use yarn (say 3 node cluster), the
> job that it runs to aggregate gets distributed as well and I am guessing
> the aggregation numbers get skewed? Is that a right assessment?
> 
> On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com> wrote:
> 
>> Also, next.getValue() or next.getKey() does not yield anything.
>> 
>>   KeyValueIterator<String, String> i = store.all();
>> 
>>  while(i.hasNext()){
>> 
>>       Entry <String, String> next = i.next();
>> 
>>       log.info("Removed Key", next.getValue());
>> 
>>  }
>> 
>> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>> 
>>> Yi,
>>> 
>>> There is no exception. I want to do couple of things in the window.
>>> 
>>> - Get all the keys and values and publish to another store (like
>>> graphite) as a list
>>> - Remove all entries.
>>> 
>>> I can iterate thro the list later but I want to be able to get all kv
>>> values and delete all of them in an atomic operation.
>>> 
>>> How do I do these operations on the kv store?
>>> 
>>> - S
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com> wrote:
>>> 
>>>> Hi, Shekar,
>>>> 
>>>> Sorry I was not able to follow up w/ you in time. It is great that you
>>>> have
>>>> found the configure problem and made it work!
>>>> 
>>>> As for the exception on the iterator, could you send us the log w/ the
>>>> exception?
>>>> 
>>>> Thanks!
>>>> 
>>>> -Yi
>>>> 
>>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>>>> 
>>>>> Yi,
>>>>> 
>>>>> Looks like it is working now. There was a redundant line in the config.
>>>>> 
>>>>> I am able to initialize kv store and add values.
>>>>> In the window code, I am unable to retrieve them and mark them as 0.
>>>>> 
>>>>> Here is my window code:
>>>>> 
>>>>> public void window(MessageCollector collector,
>>>>> 
>>>>>      TaskCoordinator coordinator) {
>>>>> 
>>>>>  //store.delete(appName);
>>>>> 
>>>>>  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>> eventsSeen));
>>>>> 
>>>>>  KeyValueIterator<String, String> i = store.all();
>>>>> 
>>>>>  while(i.hasNext()){
>>>>> 
>>>>>   Entry <String, String> next = i.next();
>>>>> 
>>>>>   log.info("Trying to remove Key", next.getKey());
>>>>> 
>>>>>   //i.remove();
>>>>> 
>>>>> 
>>>>> 
>>>>>  }
>>>>> 
>>>>>  eventsSeen = 0;
>>>>> 
>>>>>  i.close();
>>>>> 
>>>>> 
>>>>> 
>>>>>  }
>>>>> 
>>>>> 
>>>>> How do I retrieve the key and is there a way to remove it? i.remove
>>>> throws
>>>>> an exception.
>>>>> 
>>>>> 
>>>>> - Shekar
>>>>> 
>>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctip...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> Yi,
>>>>>> 
>>>>>> Here is my config file:
>>>>>> http://pastebin.com/Kf3C9E0h
>>>>>> 
>>>>>> - S
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 

Reply via email to