Sorry .. That was a copy paste issue This is what I have
stores.store.key.serde=org.apache.samza.serializers.JsonSerdeFactory stores.store.msg.serde=org.apache.samza.serializers.JsonSerdeFactory stores.store.changelog=argos.windowchangelog and this is how i am initializing it private KeyValueStore<String, Integer> store; On Wed, Jul 1, 2015 at 3:55 PM, Yan Fang <yanfang...@gmail.com> wrote: > So do you use the "store-name" as the kv storage name in your StreamTask > code? > > Fang, Yan > yanfang...@gmail.com > > On Wed, Jul 1, 2015 at 3:41 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Yan, > > > > yes. I do have it. > > > > - Shekar > > > > On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > > > Do you have > > > > > > serializers.registry.json.class > > > =org.apache.samza.serializers.JsonSerdeFactory > > > > > > in your config file? > > > > > > > > > Fang, Yan > > > yanfang...@gmail.com > > > > > > On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur <ctip...@gmail.com> > wrote: > > > > > > > Yi/Milinda, > > > > > > > > I am trying to initialize a kv store. I have the following properties > > > > defined: > > > > > > > > stores.store-name.key.serde=json > > > > > > > > stores.store-name.msg.serde=json > > > > > > > > stores.store-name.changelog=argos.windowchangelog > > > > How do I define a key serde as I am getting this exception: > > > > > > > > Exception in thread "main" org.apache.samza.SamzaException: Must > > define a > > > > key serde when using key value storage. > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) > > > > > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > > > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > > > > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > > > > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > > > > > > > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) > > > > > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > > > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > > > > > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > > > > > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > > > > > > > at > > > > > > > > > > > > > > scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) > > > > > > > > at scala.collection.SetLike$class.map(SetLike.scala:93) > > > > > > > > at scala.collection.AbstractSet.map(Set.scala:47) > > > > > > > > at > > > > > > > > > > org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) > > > > > > > > at > > > > > > > > > > > > > > org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) > > > > > > > > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) > > > > > > > > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) > > > > > > > > at org.apache.samza.job.JobRunner.main(JobRunner.scala) > > > > > > > > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur <ctip...@gmail.com> > > > wrote: > > > > > > > > > Yi, > > > > > > > > > > My use case is more of the latter. Your explanation makes sense > now. > > I > > > > was > > > > > also looking into Milinda's wiki. She has a section for Kafka > > > > > partition SimplePartitioner, which is simple enough as well. > > > > > > > > > > Thanks for all the inputs. Let me see what I come up with while > > > > > implementing it. > > > > > > > > > > - Shekar > > > > > > > > > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan <nickpa...@gmail.com> > > wrote: > > > > > > > > > >> Hi, Shekar, > > > > >> > > > > >> First, I would like to clarify what you meant by sliding window: > is > > it > > > > >> defined as windows with size N and advance step size of 1 (which > > means > > > > >> that > > > > >> windows overlap and each input message would contribute to > multiple > > > > counts > > > > >> in different windows)? Or windows with size N and advance step > size > > > of N > > > > >> (i.e. each incoming message only contribute to one counter in a > > single > > > > >> window)? > > > > >> > > > > >> If your use case falls into the first category, you will need > > > something > > > > >> more sophisticated as discussed in SAMZA-552. If your use case is > > the > > > > >> second one, there could be a simpler version of SAMZA-552 that you > > can > > > > go > > > > >> with: > > > > >> > > > > >> 1) Initiate a KV-store that uses the application name as the key > > > > >> 2) For each incoming message, look for the windows that the > message > > by > > > > the > > > > >> application name > > > > >> 3) Update the counter and update the value in the KV-store based > on > > > the > > > > >> application name > > > > >> 4) Every 5 min when window() method is triggered, set all counters > > to > > > > zero > > > > >> (this can be done in a lazy way as well, by keeping the last reset > > > > >> timestamp in the record in the KV-store, keyed by application > name. > > > > Then, > > > > >> resetting counter to zero can be done when next time the > application > > > > >> counter is updated again) > > > > >> > > > > >> Hope that makes sense. > > > > >> > > > > >> -Yi > > > > >> > > > > >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur < > ctip...@gmail.com> > > > > >> wrote: > > > > >> > > > > >> > Benjamin, > > > > >> > > > > > >> > Thanks for the explanation. We dont have any specific partition > > > scheme > > > > >> as > > > > >> > yet. We just have 2 topics - raw and processed and we use > default > > > > >> > partitioning scheme. > > > > >> > Can you share any code snippet so I can understand it better? > > > > >> > > > > > >> > - Shekar > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >