Yi/Milinda,

I am trying to initialize a kv store. I have the following properties
defined:

stores.store-name.key.serde=json

stores.store-name.msg.serde=json

stores.store-name.changelog=argos.windowchangelog
How do I define a key serde as I am getting this exception:

Exception in thread "main" org.apache.samza.SamzaException: Must define a
key serde when using key value storage.

at
org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)

at
org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)

at
org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)

at
org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)

at
org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)

at scala.collection.SetLike$class.map(SetLike.scala:93)

at scala.collection.AbstractSet.map(Set.scala:47)

at
org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)

at
org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)

at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)

at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)

at org.apache.samza.job.JobRunner.main(JobRunner.scala)

On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur <ctip...@gmail.com> wrote:

> Yi,
>
> My use case is more of the latter. Your explanation makes sense now. I was
> also looking into Milinda's wiki. She has a section for Kafka
> partition SimplePartitioner, which is simple enough as well.
>
> Thanks for all the inputs. Let me see what I come up with while
> implementing it.
>
> - Shekar
>
> On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan <nickpa...@gmail.com> wrote:
>
>> Hi, Shekar,
>>
>> First, I would like to clarify what you meant by sliding window: is it
>> defined as windows with size N and advance step size of 1 (which means
>> that
>> windows overlap and each input message would contribute to multiple counts
>> in different windows)? Or windows with size N and advance step size of N
>> (i.e. each incoming message only contribute to one counter in a single
>> window)?
>>
>> If your use case falls into the first category, you will need something
>> more sophisticated as discussed in SAMZA-552. If your use case is the
>> second one, there could be a simpler version of SAMZA-552 that you can go
>> with:
>>
>> 1) Initiate a KV-store that uses the application name as the key
>> 2) For each incoming message, look for the windows that the message by the
>> application name
>> 3) Update the counter and update the value in the KV-store based on the
>> application name
>> 4) Every 5 min when window() method is triggered, set all counters to zero
>> (this can be done in a lazy way as well, by keeping the last reset
>> timestamp in the record in the KV-store, keyed by application name. Then,
>> resetting counter to zero can be done when next time the application
>> counter is updated again)
>>
>> Hope that makes sense.
>>
>> -Yi
>>
>> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur <ctip...@gmail.com>
>> wrote:
>>
>> > Benjamin,
>> >
>> > Thanks for the explanation. We dont have any specific partition scheme
>> as
>> > yet. We just have 2 topics - raw and processed and we use default
>> > partitioning scheme.
>> > Can you share any code snippet so I can understand it better?
>> >
>> > - Shekar
>> >
>>
>
>

Reply via email to